From 30ca0e4322dcc9fd9b4ba9560e9245df81db8d7a Mon Sep 17 00:00:00 2001 From: Steve Kriss Date: Mon, 13 Apr 2020 14:53:58 -0600 Subject: [PATCH] split out collecting items from backing up items Signed-off-by: Steve Kriss --- pkg/backup/backup.go | 170 +++++++++++--- pkg/backup/item_backupper.go | 21 +- ...esource_backupper.go => item_collector.go} | 218 ++++++++---------- ...ckupper_test.go => item_collector_test.go} | 0 4 files changed, 247 insertions(+), 162 deletions(-) rename pkg/backup/{resource_backupper.go => item_collector.go} (60%) rename pkg/backup/{resource_backupper_test.go => item_collector_test.go} (100%) diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index b19687b8d..cef92bfb4 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -1,5 +1,5 @@ /* -Copyright 2017 the Velero contributors. +Copyright 2017, 2020 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -20,20 +20,27 @@ import ( "archive/tar" "compress/gzip" "context" + "encoding/json" "fmt" "io" + "io/ioutil" + "os" "path/filepath" "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" + kubeerrs "k8s.io/apimachinery/pkg/util/errors" - api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/client" "github.com/vmware-tanzu/velero/pkg/discovery" + "github.com/vmware-tanzu/velero/pkg/kuberesource" "github.com/vmware-tanzu/velero/pkg/plugin/velero" "github.com/vmware-tanzu/velero/pkg/podexec" "github.com/vmware-tanzu/velero/pkg/restic" @@ -49,7 +56,7 @@ const BackupFormatVersion = "1.1.0" // Backupper performs backups. type Backupper interface { - // Backup takes a backup using the specification in the api.Backup and writes backup and log data + // Backup takes a backup using the specification in the velerov1api.Backup and writes backup and log data // to the given writers. Backup(logger logrus.FieldLogger, backup *Request, backupFile io.Writer, actions []velero.BackupItemAction, volumeSnapshotterGetter VolumeSnapshotterGetter) error } @@ -160,11 +167,11 @@ func getResourceIncludesExcludes(helper discovery.Helper, includes, excludes []s // getNamespaceIncludesExcludes returns an IncludesExcludes list containing which namespaces to // include and exclude from the backup. -func getNamespaceIncludesExcludes(backup *api.Backup) *collections.IncludesExcludes { +func getNamespaceIncludesExcludes(backup *velerov1api.Backup) *collections.IncludesExcludes { return collections.NewIncludesExcludes().Includes(backup.Spec.IncludedNamespaces...).Excludes(backup.Spec.ExcludedNamespaces...) } -func getResourceHooks(hookSpecs []api.BackupResourceHookSpec, discoveryHelper discovery.Helper) ([]resourceHook, error) { +func getResourceHooks(hookSpecs []velerov1api.BackupResourceHookSpec, discoveryHelper discovery.Helper) ([]resourceHook, error) { resourceHooks := make([]resourceHook, 0, len(hookSpecs)) for _, s := range hookSpecs { @@ -179,7 +186,7 @@ func getResourceHooks(hookSpecs []api.BackupResourceHookSpec, discoveryHelper di return resourceHooks, nil } -func getResourceHook(hookSpec api.BackupResourceHookSpec, discoveryHelper discovery.Helper) (resourceHook, error) { +func getResourceHook(hookSpec velerov1api.BackupResourceHookSpec, discoveryHelper discovery.Helper) (resourceHook, error) { h := resourceHook{ name: hookSpec.Name, namespaces: collections.NewIncludesExcludes().Includes(hookSpec.IncludedNamespaces...).Excludes(hookSpec.ExcludedNamespaces...), @@ -204,7 +211,7 @@ type VolumeSnapshotterGetter interface { } // Backup backs up the items specified in the Backup, placing them in a gzip-compressed tar file -// written to backupFile. The finalized api.Backup is written to metadata. Any error that represents +// written to backupFile. The finalized velerov1api.Backup is written to metadata. Any error that represents // a complete backup failure is returned. Errors that constitute partial failures (i.e. failures to // back up individual resources that don't prevent the backup from continuing to be processed) are logged // to the backup log. @@ -242,7 +249,7 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req backupRequest.BackedUpItems = map[itemKey]struct{}{} podVolumeTimeout := kb.resticTimeout - if val := backupRequest.Annotations[api.PodVolumeOperationTimeoutAnnotation]; val != "" { + if val := backupRequest.Annotations[velerov1api.PodVolumeOperationTimeoutAnnotation]; val != "" { parsed, err := time.ParseDuration(val) if err != nil { log.WithError(errors.WithStack(err)).Errorf("Unable to parse pod volume timeout annotation %s, using server value.", val) @@ -262,41 +269,146 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req } } - pvcSnapshotTracker := newPVCSnapshotTracker() - newItemBackupper := func() ItemBackupper { - itemBackupper := &defaultItemBackupper{ - backupRequest: backupRequest, - tarWriter: tw, - dynamicFactory: kb.dynamicFactory, - discoveryHelper: kb.discoveryHelper, - resticBackupper: resticBackupper, - resticSnapshotTracker: pvcSnapshotTracker, - volumeSnapshotterGetter: volumeSnapshotterGetter, - itemHookHandler: &defaultItemHookHandler{ - podCommandExecutor: kb.podCommandExecutor, - }, - } - itemBackupper.additionalItemBackupper = itemBackupper - - return itemBackupper + // set up a temp dir for the itemCollector to use to temporarily + // store items as they're scraped from the API. + tempDir, err := ioutil.TempDir("", "") + if err != nil { + return errors.Wrap(err, "error creating temp dir for backup") } + defer os.RemoveAll(tempDir) - resourceBackupper := &resourceBackupper{ + collector := &itemCollector{ log: log, backupRequest: backupRequest, discoveryHelper: kb.discoveryHelper, dynamicFactory: kb.dynamicFactory, cohabitatingResources: cohabitatingResources(), - newItemBackupper: newItemBackupper, + dir: tempDir, } - resourceBackupper.backupAllGroups() + items := collector.getAllItems() + log.WithField("progress", "").Infof("Collected %d items matching the backup spec from the Kubernetes API (actual number of items backed up may be more or less depending on velero.io/exclude-from-backup annotation, plugins returning additional related items to back up, etc.)", len(items)) + + itemBackupper := &itemBackupper{ + backupRequest: backupRequest, + tarWriter: tw, + dynamicFactory: kb.dynamicFactory, + discoveryHelper: kb.discoveryHelper, + resticBackupper: resticBackupper, + resticSnapshotTracker: newPVCSnapshotTracker(), + volumeSnapshotterGetter: volumeSnapshotterGetter, + itemHookHandler: &defaultItemHookHandler{ + podCommandExecutor: kb.podCommandExecutor, + }, + } + + backedUpGroupResources := map[schema.GroupResource]bool{} + + for i, item := range items { + log.WithFields(map[string]interface{}{ + "progress": "", + "resource": item.groupResource.String(), + "namespace": item.namespace, + "name": item.name, + }).Infof("Processing item %d of %d", i+1, len(items)) + + // use an anonymous func so we can defer-close/remove the file + // as soon as we're done with it + func() { + var unstructured unstructured.Unstructured + + f, err := os.Open(item.path) + if err != nil { + log.WithError(errors.WithStack(err)).Error("Error opening file containing item") + return + } + defer f.Close() + defer os.Remove(f.Name()) + + if err := json.NewDecoder(f).Decode(&unstructured); err != nil { + log.WithError(errors.WithStack(err)).Error("Error decoding JSON from file") + return + } + + if backedUp := kb.backupItem(log, item.groupResource, itemBackupper, &unstructured, item.preferredGVR); backedUp { + backedUpGroupResources[item.groupResource] = true + } + }() + } + + // back up CRD for resource if found. We should only need to do this if we've backed up at least + // one item for the resource and IncludeClusterResources is nil. If IncludeClusterResources is false + // we don't want to back it up, and if it's true it will already be included. + if backupRequest.Spec.IncludeClusterResources == nil { + for gr := range backedUpGroupResources { + kb.backupCRD(log, gr, itemBackupper) + } + } + + log.WithField("progress", "").Infof("Backed up a total of %d items", len(backupRequest.BackedUpItems)) return nil } +func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource) bool { + backedUpItem, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR) + if aggregate, ok := err.(kubeerrs.Aggregate); ok { + log.WithField("name", unstructured.GetName()).Infof("%d errors encountered backup up item", len(aggregate.Errors())) + // log each error separately so we get error location info in the log, and an + // accurate count of errors + for _, err = range aggregate.Errors() { + log.WithError(err).WithField("name", unstructured.GetName()).Error("Error backing up item") + } + + return false + } + if err != nil { + log.WithError(err).WithField("name", unstructured.GetName()).Error("Error backing up item") + return false + } + return backedUpItem +} + +// backupCRD checks if the resource is a custom resource, and if so, backs up the custom resource definition +// associated with it. +func (kb *kubernetesBackupper) backupCRD(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper) { + crdGroupResource := kuberesource.CustomResourceDefinitions + + log.Debugf("Getting server preferred API version for %s", crdGroupResource) + gvr, apiResource, err := kb.discoveryHelper.ResourceFor(crdGroupResource.WithVersion("")) + if err != nil { + log.WithError(errors.WithStack(err)).Errorf("Error getting resolved resource for %s", crdGroupResource) + return + } + log.Debugf("Got server preferred API version %s for %s", gvr.Version, crdGroupResource) + + log.Debugf("Getting dynamic client for %s", gvr.String()) + crdClient, err := kb.dynamicFactory.ClientForGroupVersionResource(gvr.GroupVersion(), apiResource, "") + if err != nil { + log.WithError(errors.WithStack(err)).Errorf("Error getting dynamic client for %s", crdGroupResource) + return + } + log.Debugf("Got dynamic client for %s", gvr.String()) + + // try to get a CRD whose name matches the provided GroupResource + unstructured, err := crdClient.Get(gr.String(), metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + // not found: this means the GroupResource provided was not a + // custom resource, so there's no CRD to back up. + log.Debugf("No CRD found for GroupResource %s", gr.String()) + return + } + if err != nil { + log.WithError(errors.WithStack(err)).Errorf("Error getting CRD %s", gr.String()) + return + } + log.Infof("Found associated CRD %s to add to backup", gr.String()) + + kb.backupItem(log, gvr.GroupResource(), itemBackupper, unstructured, gvr) +} + func (kb *kubernetesBackupper) writeBackupVersion(tw *tar.Writer) error { - versionFile := filepath.Join(api.MetadataDir, "version") + versionFile := filepath.Join(velerov1api.MetadataDir, "version") versionString := fmt.Sprintf("%s\n", BackupFormatVersion) hdr := &tar.Header{ diff --git a/pkg/backup/item_backupper.go b/pkg/backup/item_backupper.go index 8cd036a16..c630b9a05 100644 --- a/pkg/backup/item_backupper.go +++ b/pkg/backup/item_backupper.go @@ -43,12 +43,8 @@ import ( "github.com/vmware-tanzu/velero/pkg/volume" ) -// ItemBackupper can back up individual items to a tar writer. -type ItemBackupper interface { - backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, preferredGVR schema.GroupVersionResource) (bool, error) -} - -type defaultItemBackupper struct { +// itemBackupper can back up individual items to a tar writer. +type itemBackupper struct { backupRequest *Request tarWriter tarWriter dynamicFactory client.DynamicFactory @@ -58,7 +54,6 @@ type defaultItemBackupper struct { volumeSnapshotterGetter VolumeSnapshotterGetter itemHookHandler itemHookHandler - additionalItemBackupper ItemBackupper snapshotLocationVolumeSnapshotters map[string]velero.VolumeSnapshotter } @@ -66,7 +61,7 @@ type defaultItemBackupper struct { // namespaces IncludesExcludes list. // In addition to the error return, backupItem also returns a bool indicating whether the item // was actually backed up. -func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, preferredGVR schema.GroupVersionResource) (bool, error) { +func (ib *itemBackupper) backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, preferredGVR schema.GroupVersionResource) (bool, error) { metadata, err := meta.Accessor(obj) if err != nil { return false, err @@ -284,7 +279,7 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim // backupPodVolumes triggers restic backups of the specified pod volumes, and returns a list of PodVolumeBackups // for volumes that were successfully backed up, and a slice of any errors that were encountered. -func (ib *defaultItemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *corev1api.Pod, volumes []string) ([]*velerov1api.PodVolumeBackup, []error) { +func (ib *itemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *corev1api.Pod, volumes []string) ([]*velerov1api.PodVolumeBackup, []error) { if len(volumes) == 0 { return nil, nil } @@ -297,7 +292,7 @@ func (ib *defaultItemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *co return ib.resticBackupper.BackupPodVolumes(ib.backupRequest.Backup, pod, volumes, log) } -func (ib *defaultItemBackupper) executeActions( +func (ib *itemBackupper) executeActions( log logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, @@ -349,7 +344,7 @@ func (ib *defaultItemBackupper) executeActions( return nil, errors.WithStack(err) } - if _, err = ib.additionalItemBackupper.backupItem(log, additionalItem, gvr.GroupResource(), gvr); err != nil { + if _, err = ib.backupItem(log, additionalItem, gvr.GroupResource(), gvr); err != nil { return nil, err } } @@ -360,7 +355,7 @@ func (ib *defaultItemBackupper) executeActions( // volumeSnapshotter instantiates and initializes a VolumeSnapshotter given a VolumeSnapshotLocation, // or returns an existing one if one's already been initialized for the location. -func (ib *defaultItemBackupper) volumeSnapshotter(snapshotLocation *velerov1api.VolumeSnapshotLocation) (velero.VolumeSnapshotter, error) { +func (ib *itemBackupper) volumeSnapshotter(snapshotLocation *velerov1api.VolumeSnapshotLocation) (velero.VolumeSnapshotter, error) { if bs, ok := ib.snapshotLocationVolumeSnapshotters[snapshotLocation.Name]; ok { return bs, nil } @@ -394,7 +389,7 @@ const ( // takePVSnapshot triggers a snapshot for the volume/disk underlying a PersistentVolume if the provided // backup has volume snapshots enabled and the PV is of a compatible type. Also records cloud // disk type and IOPS (if applicable) to be able to restore to current state later. -func (ib *defaultItemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.FieldLogger) error { +func (ib *itemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.FieldLogger) error { log.Info("Executing takePVSnapshot") if boolptr.IsSetToFalse(ib.backupRequest.Spec.SnapshotVolumes) { diff --git a/pkg/backup/resource_backupper.go b/pkg/backup/item_collector.go similarity index 60% rename from pkg/backup/resource_backupper.go rename to pkg/backup/item_collector.go index 2454c9952..2fbf51587 100644 --- a/pkg/backup/resource_backupper.go +++ b/pkg/backup/item_collector.go @@ -17,18 +17,17 @@ limitations under the License. package backup import ( + "encoding/json" + "io/ioutil" "sort" "strings" "github.com/pkg/errors" "github.com/sirupsen/logrus" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - kubeerrs "k8s.io/apimachinery/pkg/util/errors" "github.com/vmware-tanzu/velero/pkg/client" "github.com/vmware-tanzu/velero/pkg/discovery" @@ -36,36 +35,49 @@ import ( "github.com/vmware-tanzu/velero/pkg/util/collections" ) -// resourceBackupper collects resources from the Kubernetes API according to -// the backup spec and passes them to an itemBackupper to be backed up. -type resourceBackupper struct { +// itemCollector collects items from the Kubernetes API according to +// the backup spec and writes them to files inside dir. +type itemCollector struct { log logrus.FieldLogger backupRequest *Request discoveryHelper discovery.Helper dynamicFactory client.DynamicFactory cohabitatingResources map[string]*cohabitatingResource - newItemBackupper func() ItemBackupper + dir string } -// collect backs up all API groups. -func (r *resourceBackupper) backupAllGroups() { +type kubernetesResource struct { + groupResource schema.GroupResource + preferredGVR schema.GroupVersionResource + namespace, name, path string +} + +// getAllItems gets all relevant items from all API groups. +func (r *itemCollector) getAllItems() []*kubernetesResource { + var resources []*kubernetesResource for _, group := range r.discoveryHelper.Resources() { - if err := r.backupGroup(r.log, group); err != nil { - r.log.WithError(err).WithField("apiGroup", group.String()).Error("Error backing up API group") + groupItems, err := r.getGroupItems(r.log, group) + if err != nil { + r.log.WithError(err).WithField("apiGroup", group.String()).Error("Error collecting resources from API group") + continue } + + resources = append(resources, groupItems...) } + + return resources } -// backupGroup backs up a single API group. -func (r *resourceBackupper) backupGroup(log logrus.FieldLogger, group *metav1.APIResourceList) error { +// getGroupItems collects all relevant items from a single API group. +func (r *itemCollector) getGroupItems(log logrus.FieldLogger, group *metav1.APIResourceList) ([]*kubernetesResource, error) { log = log.WithField("group", group.GroupVersion) - log.Infof("Backing up group") + log.Infof("Getting items for group") // Parse so we can check if this is the core group gv, err := schema.ParseGroupVersion(group.GroupVersion) if err != nil { - return errors.Wrapf(err, "error parsing GroupVersion %q", group.GroupVersion) + return nil, errors.Wrapf(err, "error parsing GroupVersion %q", group.GroupVersion) } if gv.Group == "" { // This is the core group, so make sure we process in the following order: pods, pvcs, pvs, @@ -73,35 +85,38 @@ func (r *resourceBackupper) backupGroup(log logrus.FieldLogger, group *metav1.AP sortCoreGroup(group) } + var items []*kubernetesResource for _, resource := range group.APIResources { - if err := r.backupResource(log, group, resource); err != nil { - log.WithError(err).WithField("resource", resource.String()).Error("Error backing up API resource") + resourceItems, err := r.getResourceItems(log, gv, resource) + if err != nil { + log.WithError(err).WithField("resource", resource.String()).Error("Error getting items for resource") + continue } + + items = append(items, resourceItems...) } - return nil + return items, nil } -// backupResource backs up all the objects for a given group-version-resource. -func (r *resourceBackupper) backupResource(log logrus.FieldLogger, group *metav1.APIResourceList, resource metav1.APIResource) error { +// getResourceItems collects all relevant items for a given group-version-resource. +func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.GroupVersion, resource metav1.APIResource) ([]*kubernetesResource, error) { log = log.WithField("resource", resource.Name) - log.Info("Backing up resource") + log.Info("Getting items for resource") - gv, err := schema.ParseGroupVersion(group.GroupVersion) - if err != nil { - return errors.Wrapf(err, "error parsing GroupVersion %s", group.GroupVersion) - } - gr := schema.GroupResource{Group: gv.Group, Resource: resource.Name} + var ( + gvr = gv.WithResource(resource.Name) + gr = gvr.GroupResource() + clusterScoped = !resource.Namespaced + ) // Getting the preferred group version of this resource preferredGVR, _, err := r.discoveryHelper.ResourceFor(gr.WithVersion("")) if err != nil { - return errors.WithStack(err) + return nil, errors.WithStack(err) } - clusterScoped := !resource.Namespaced - // If the resource we are backing up is NOT namespaces, and it is cluster-scoped, check to see if // we should include it based on the IncludeClusterResources setting. if gr != kuberesource.Namespaces && clusterScoped { @@ -115,17 +130,17 @@ func (r *resourceBackupper) backupResource(log logrus.FieldLogger, group *metav1 // If we're processing namespaces themselves, we will not skip here, they may be // filtered out later. log.Info("Skipping resource because it's cluster-scoped and only specific namespaces are included in the backup") - return nil + return nil, nil } } else if !*r.backupRequest.Spec.IncludeClusterResources { log.Info("Skipping resource because it's cluster-scoped") - return nil + return nil, nil } } if !r.backupRequest.ResourceIncludesExcludes.ShouldInclude(gr.String()) { log.Infof("Skipping resource because it's excluded") - return nil + return nil, nil } if cohabitator, found := r.cohabitatingResources[resource.Name]; found { @@ -136,13 +151,11 @@ func (r *resourceBackupper) backupResource(log logrus.FieldLogger, group *metav1 "cohabitatingResource2": cohabitator.groupResource2.String(), }, ).Infof("Skipping resource because it cohabitates and we've already processed it") - return nil + return nil, nil } cohabitator.seen = true } - itemBackupper := r.newItemBackupper() - namespacesToList := getNamespacesToList(r.backupRequest.NamespaceIncludesExcludes) // Check if we're backing up namespaces, and only certain ones @@ -156,10 +169,11 @@ func (r *resourceBackupper) backupResource(log logrus.FieldLogger, group *metav1 labelSelector, err = metav1.LabelSelectorAsSelector(r.backupRequest.Spec.LabelSelector) if err != nil { // This should never happen... - return errors.Wrap(err, "invalid label selector") + return nil, errors.Wrap(err, "invalid label selector") } } + var items []*kubernetesResource for _, ns := range namespacesToList { log = log.WithField("namespace", ns) log.Info("Getting namespace") @@ -175,12 +189,21 @@ func (r *resourceBackupper) backupResource(log logrus.FieldLogger, group *metav1 continue } - if _, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR); err != nil { - log.WithError(errors.WithStack(err)).Error("Error backing up namespace") + path, err := r.writeToFile(unstructured) + if err != nil { + log.WithError(err).Error("Error writing item to file") + continue } + + items = append(items, &kubernetesResource{ + groupResource: gr, + preferredGVR: preferredGVR, + name: ns, + path: path, + }) } - return nil + return items, nil } } @@ -189,7 +212,8 @@ func (r *resourceBackupper) backupResource(log logrus.FieldLogger, group *metav1 namespacesToList = []string{""} } - backedUpItem := false + var items []*kubernetesResource + for _, namespace := range namespacesToList { log = log.WithField("namespace", namespace) @@ -212,101 +236,55 @@ func (r *resourceBackupper) backupResource(log logrus.FieldLogger, group *metav1 } log.Infof("Retrieved %d items", len(unstructuredList.Items)) - // do the backup - for _, item := range unstructuredList.Items { - if r.backupItem(log, gr, itemBackupper, &item, preferredGVR) { - backedUpItem = true + // collect the items + for i := range unstructuredList.Items { + item := &unstructuredList.Items[i] + + if gr == kuberesource.Namespaces && !r.backupRequest.NamespaceIncludesExcludes.ShouldInclude(item.GetName()) { + log.WithField("name", item.GetName()).Info("Skipping namespace because it's excluded") + continue } + + path, err := r.writeToFile(item) + if err != nil { + log.WithError(err).Error("Error writing item to file") + continue + } + + items = append(items, &kubernetesResource{ + groupResource: gr, + preferredGVR: preferredGVR, + namespace: item.GetNamespace(), + name: item.GetName(), + path: path, + }) } } - // back up CRD for resource if found. We should only need to do this if we've backed up at least - // one item and IncludeClusterResources is nil. If IncludeClusterResources is false - // we don't want to back it up, and if it's true it will already be included. - if backedUpItem && r.backupRequest.Spec.IncludeClusterResources == nil { - r.backupCRD(log, gr, itemBackupper) - } - - return nil + return items, nil } -func (r *resourceBackupper) backupItem( - log logrus.FieldLogger, - gr schema.GroupResource, - itemBackupper ItemBackupper, - unstructured runtime.Unstructured, - preferredGVR schema.GroupVersionResource, -) bool { - metadata, err := meta.Accessor(unstructured) +func (r *itemCollector) writeToFile(item *unstructured.Unstructured) (string, error) { + f, err := ioutil.TempFile(r.dir, "") if err != nil { - log.WithError(errors.WithStack(err)).Error("Error getting a metadata accessor") - return false + return "", errors.Wrap(err, "error creating temp file") } + defer f.Close() - log = log.WithFields(map[string]interface{}{ - "namespace": metadata.GetNamespace(), - "name": metadata.GetName(), - }) - - if gr == kuberesource.Namespaces && !r.backupRequest.NamespaceIncludesExcludes.ShouldInclude(metadata.GetName()) { - log.Info("Skipping namespace because it's excluded") - return false - } - - backedUpItem, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR) - if aggregate, ok := err.(kubeerrs.Aggregate); ok { - log.Infof("%d errors encountered backup up item", len(aggregate.Errors())) - // log each error separately so we get error location info in the log, and an - // accurate count of errors - for _, err = range aggregate.Errors() { - log.WithError(err).Error("Error backing up item") - } - - return false - } + jsonBytes, err := json.Marshal(item) if err != nil { - log.WithError(err).Error("Error backing up item") - return false + return "", errors.Wrap(err, "error converting item to JSON") } - return backedUpItem -} -// backupCRD checks if the resource is a custom resource, and if so, backs up the custom resource definition -// associated with it. -func (r *resourceBackupper) backupCRD(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper ItemBackupper) { - crdGroupResource := kuberesource.CustomResourceDefinitions - - log.Debugf("Getting server preferred API version for %s", crdGroupResource) - gvr, apiResource, err := r.discoveryHelper.ResourceFor(crdGroupResource.WithVersion("")) - if err != nil { - log.WithError(errors.WithStack(err)).Errorf("Error getting resolved resource for %s", crdGroupResource) - return + if _, err := f.Write(jsonBytes); err != nil { + return "", errors.Wrap(err, "error writing JSON to file") } - log.Debugf("Got server preferred API version %s for %s", gvr.Version, crdGroupResource) - log.Debugf("Getting dynamic client for %s", gvr.String()) - crdClient, err := r.dynamicFactory.ClientForGroupVersionResource(gvr.GroupVersion(), apiResource, "") - if err != nil { - log.WithError(errors.WithStack(err)).Errorf("Error getting dynamic client for %s", crdGroupResource) - return + if err := f.Close(); err != nil { + return "", errors.Wrap(err, "error closing file") } - log.Debugf("Got dynamic client for %s", gvr.String()) - // try to get a CRD whose name matches the provided GroupResource - unstructured, err := crdClient.Get(gr.String(), metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - // not found: this means the GroupResource provided was not a - // custom resource, so there's no CRD to back up. - log.Debugf("No CRD found for GroupResource %s", gr.String()) - return - } - if err != nil { - log.WithError(errors.WithStack(err)).Errorf("Error getting CRD %s", gr.String()) - return - } - log.Infof("Found associated CRD %s to add to backup", gr.String()) - - r.backupItem(log, gvr.GroupResource(), itemBackupper, unstructured, gvr) + return f.Name(), nil } // sortCoreGroup sorts the core API group. diff --git a/pkg/backup/resource_backupper_test.go b/pkg/backup/item_collector_test.go similarity index 100% rename from pkg/backup/resource_backupper_test.go rename to pkg/backup/item_collector_test.go