split out collecting items from backing up items

Signed-off-by: Steve Kriss <krisss@vmware.com>
pull/2440/head
Steve Kriss 2020-04-13 14:53:58 -06:00
parent 19c52434b4
commit 30ca0e4322
4 changed files with 247 additions and 162 deletions

View File

@ -1,5 +1,5 @@
/*
Copyright 2017 the Velero contributors.
Copyright 2017, 2020 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -20,20 +20,27 @@ import (
"archive/tar"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
"github.com/vmware-tanzu/velero/pkg/podexec"
"github.com/vmware-tanzu/velero/pkg/restic"
@ -49,7 +56,7 @@ const BackupFormatVersion = "1.1.0"
// Backupper performs backups.
type Backupper interface {
// Backup takes a backup using the specification in the api.Backup and writes backup and log data
// Backup takes a backup using the specification in the velerov1api.Backup and writes backup and log data
// to the given writers.
Backup(logger logrus.FieldLogger, backup *Request, backupFile io.Writer, actions []velero.BackupItemAction, volumeSnapshotterGetter VolumeSnapshotterGetter) error
}
@ -160,11 +167,11 @@ func getResourceIncludesExcludes(helper discovery.Helper, includes, excludes []s
// getNamespaceIncludesExcludes returns an IncludesExcludes list containing which namespaces to
// include and exclude from the backup.
func getNamespaceIncludesExcludes(backup *api.Backup) *collections.IncludesExcludes {
func getNamespaceIncludesExcludes(backup *velerov1api.Backup) *collections.IncludesExcludes {
return collections.NewIncludesExcludes().Includes(backup.Spec.IncludedNamespaces...).Excludes(backup.Spec.ExcludedNamespaces...)
}
func getResourceHooks(hookSpecs []api.BackupResourceHookSpec, discoveryHelper discovery.Helper) ([]resourceHook, error) {
func getResourceHooks(hookSpecs []velerov1api.BackupResourceHookSpec, discoveryHelper discovery.Helper) ([]resourceHook, error) {
resourceHooks := make([]resourceHook, 0, len(hookSpecs))
for _, s := range hookSpecs {
@ -179,7 +186,7 @@ func getResourceHooks(hookSpecs []api.BackupResourceHookSpec, discoveryHelper di
return resourceHooks, nil
}
func getResourceHook(hookSpec api.BackupResourceHookSpec, discoveryHelper discovery.Helper) (resourceHook, error) {
func getResourceHook(hookSpec velerov1api.BackupResourceHookSpec, discoveryHelper discovery.Helper) (resourceHook, error) {
h := resourceHook{
name: hookSpec.Name,
namespaces: collections.NewIncludesExcludes().Includes(hookSpec.IncludedNamespaces...).Excludes(hookSpec.ExcludedNamespaces...),
@ -204,7 +211,7 @@ type VolumeSnapshotterGetter interface {
}
// Backup backs up the items specified in the Backup, placing them in a gzip-compressed tar file
// written to backupFile. The finalized api.Backup is written to metadata. Any error that represents
// written to backupFile. The finalized velerov1api.Backup is written to metadata. Any error that represents
// a complete backup failure is returned. Errors that constitute partial failures (i.e. failures to
// back up individual resources that don't prevent the backup from continuing to be processed) are logged
// to the backup log.
@ -242,7 +249,7 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req
backupRequest.BackedUpItems = map[itemKey]struct{}{}
podVolumeTimeout := kb.resticTimeout
if val := backupRequest.Annotations[api.PodVolumeOperationTimeoutAnnotation]; val != "" {
if val := backupRequest.Annotations[velerov1api.PodVolumeOperationTimeoutAnnotation]; val != "" {
parsed, err := time.ParseDuration(val)
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Unable to parse pod volume timeout annotation %s, using server value.", val)
@ -262,41 +269,146 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req
}
}
pvcSnapshotTracker := newPVCSnapshotTracker()
newItemBackupper := func() ItemBackupper {
itemBackupper := &defaultItemBackupper{
backupRequest: backupRequest,
tarWriter: tw,
dynamicFactory: kb.dynamicFactory,
discoveryHelper: kb.discoveryHelper,
resticBackupper: resticBackupper,
resticSnapshotTracker: pvcSnapshotTracker,
volumeSnapshotterGetter: volumeSnapshotterGetter,
itemHookHandler: &defaultItemHookHandler{
podCommandExecutor: kb.podCommandExecutor,
},
}
itemBackupper.additionalItemBackupper = itemBackupper
return itemBackupper
// set up a temp dir for the itemCollector to use to temporarily
// store items as they're scraped from the API.
tempDir, err := ioutil.TempDir("", "")
if err != nil {
return errors.Wrap(err, "error creating temp dir for backup")
}
defer os.RemoveAll(tempDir)
resourceBackupper := &resourceBackupper{
collector := &itemCollector{
log: log,
backupRequest: backupRequest,
discoveryHelper: kb.discoveryHelper,
dynamicFactory: kb.dynamicFactory,
cohabitatingResources: cohabitatingResources(),
newItemBackupper: newItemBackupper,
dir: tempDir,
}
resourceBackupper.backupAllGroups()
items := collector.getAllItems()
log.WithField("progress", "").Infof("Collected %d items matching the backup spec from the Kubernetes API (actual number of items backed up may be more or less depending on velero.io/exclude-from-backup annotation, plugins returning additional related items to back up, etc.)", len(items))
itemBackupper := &itemBackupper{
backupRequest: backupRequest,
tarWriter: tw,
dynamicFactory: kb.dynamicFactory,
discoveryHelper: kb.discoveryHelper,
resticBackupper: resticBackupper,
resticSnapshotTracker: newPVCSnapshotTracker(),
volumeSnapshotterGetter: volumeSnapshotterGetter,
itemHookHandler: &defaultItemHookHandler{
podCommandExecutor: kb.podCommandExecutor,
},
}
backedUpGroupResources := map[schema.GroupResource]bool{}
for i, item := range items {
log.WithFields(map[string]interface{}{
"progress": "",
"resource": item.groupResource.String(),
"namespace": item.namespace,
"name": item.name,
}).Infof("Processing item %d of %d", i+1, len(items))
// use an anonymous func so we can defer-close/remove the file
// as soon as we're done with it
func() {
var unstructured unstructured.Unstructured
f, err := os.Open(item.path)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error opening file containing item")
return
}
defer f.Close()
defer os.Remove(f.Name())
if err := json.NewDecoder(f).Decode(&unstructured); err != nil {
log.WithError(errors.WithStack(err)).Error("Error decoding JSON from file")
return
}
if backedUp := kb.backupItem(log, item.groupResource, itemBackupper, &unstructured, item.preferredGVR); backedUp {
backedUpGroupResources[item.groupResource] = true
}
}()
}
// back up CRD for resource if found. We should only need to do this if we've backed up at least
// one item for the resource and IncludeClusterResources is nil. If IncludeClusterResources is false
// we don't want to back it up, and if it's true it will already be included.
if backupRequest.Spec.IncludeClusterResources == nil {
for gr := range backedUpGroupResources {
kb.backupCRD(log, gr, itemBackupper)
}
}
log.WithField("progress", "").Infof("Backed up a total of %d items", len(backupRequest.BackedUpItems))
return nil
}
func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource) bool {
backedUpItem, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR)
if aggregate, ok := err.(kubeerrs.Aggregate); ok {
log.WithField("name", unstructured.GetName()).Infof("%d errors encountered backup up item", len(aggregate.Errors()))
// log each error separately so we get error location info in the log, and an
// accurate count of errors
for _, err = range aggregate.Errors() {
log.WithError(err).WithField("name", unstructured.GetName()).Error("Error backing up item")
}
return false
}
if err != nil {
log.WithError(err).WithField("name", unstructured.GetName()).Error("Error backing up item")
return false
}
return backedUpItem
}
// backupCRD checks if the resource is a custom resource, and if so, backs up the custom resource definition
// associated with it.
func (kb *kubernetesBackupper) backupCRD(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper) {
crdGroupResource := kuberesource.CustomResourceDefinitions
log.Debugf("Getting server preferred API version for %s", crdGroupResource)
gvr, apiResource, err := kb.discoveryHelper.ResourceFor(crdGroupResource.WithVersion(""))
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Error getting resolved resource for %s", crdGroupResource)
return
}
log.Debugf("Got server preferred API version %s for %s", gvr.Version, crdGroupResource)
log.Debugf("Getting dynamic client for %s", gvr.String())
crdClient, err := kb.dynamicFactory.ClientForGroupVersionResource(gvr.GroupVersion(), apiResource, "")
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Error getting dynamic client for %s", crdGroupResource)
return
}
log.Debugf("Got dynamic client for %s", gvr.String())
// try to get a CRD whose name matches the provided GroupResource
unstructured, err := crdClient.Get(gr.String(), metav1.GetOptions{})
if apierrors.IsNotFound(err) {
// not found: this means the GroupResource provided was not a
// custom resource, so there's no CRD to back up.
log.Debugf("No CRD found for GroupResource %s", gr.String())
return
}
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Error getting CRD %s", gr.String())
return
}
log.Infof("Found associated CRD %s to add to backup", gr.String())
kb.backupItem(log, gvr.GroupResource(), itemBackupper, unstructured, gvr)
}
func (kb *kubernetesBackupper) writeBackupVersion(tw *tar.Writer) error {
versionFile := filepath.Join(api.MetadataDir, "version")
versionFile := filepath.Join(velerov1api.MetadataDir, "version")
versionString := fmt.Sprintf("%s\n", BackupFormatVersion)
hdr := &tar.Header{

View File

@ -43,12 +43,8 @@ import (
"github.com/vmware-tanzu/velero/pkg/volume"
)
// ItemBackupper can back up individual items to a tar writer.
type ItemBackupper interface {
backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, preferredGVR schema.GroupVersionResource) (bool, error)
}
type defaultItemBackupper struct {
// itemBackupper can back up individual items to a tar writer.
type itemBackupper struct {
backupRequest *Request
tarWriter tarWriter
dynamicFactory client.DynamicFactory
@ -58,7 +54,6 @@ type defaultItemBackupper struct {
volumeSnapshotterGetter VolumeSnapshotterGetter
itemHookHandler itemHookHandler
additionalItemBackupper ItemBackupper
snapshotLocationVolumeSnapshotters map[string]velero.VolumeSnapshotter
}
@ -66,7 +61,7 @@ type defaultItemBackupper struct {
// namespaces IncludesExcludes list.
// In addition to the error return, backupItem also returns a bool indicating whether the item
// was actually backed up.
func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, preferredGVR schema.GroupVersionResource) (bool, error) {
func (ib *itemBackupper) backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, preferredGVR schema.GroupVersionResource) (bool, error) {
metadata, err := meta.Accessor(obj)
if err != nil {
return false, err
@ -284,7 +279,7 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
// backupPodVolumes triggers restic backups of the specified pod volumes, and returns a list of PodVolumeBackups
// for volumes that were successfully backed up, and a slice of any errors that were encountered.
func (ib *defaultItemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *corev1api.Pod, volumes []string) ([]*velerov1api.PodVolumeBackup, []error) {
func (ib *itemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *corev1api.Pod, volumes []string) ([]*velerov1api.PodVolumeBackup, []error) {
if len(volumes) == 0 {
return nil, nil
}
@ -297,7 +292,7 @@ func (ib *defaultItemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *co
return ib.resticBackupper.BackupPodVolumes(ib.backupRequest.Backup, pod, volumes, log)
}
func (ib *defaultItemBackupper) executeActions(
func (ib *itemBackupper) executeActions(
log logrus.FieldLogger,
obj runtime.Unstructured,
groupResource schema.GroupResource,
@ -349,7 +344,7 @@ func (ib *defaultItemBackupper) executeActions(
return nil, errors.WithStack(err)
}
if _, err = ib.additionalItemBackupper.backupItem(log, additionalItem, gvr.GroupResource(), gvr); err != nil {
if _, err = ib.backupItem(log, additionalItem, gvr.GroupResource(), gvr); err != nil {
return nil, err
}
}
@ -360,7 +355,7 @@ func (ib *defaultItemBackupper) executeActions(
// volumeSnapshotter instantiates and initializes a VolumeSnapshotter given a VolumeSnapshotLocation,
// or returns an existing one if one's already been initialized for the location.
func (ib *defaultItemBackupper) volumeSnapshotter(snapshotLocation *velerov1api.VolumeSnapshotLocation) (velero.VolumeSnapshotter, error) {
func (ib *itemBackupper) volumeSnapshotter(snapshotLocation *velerov1api.VolumeSnapshotLocation) (velero.VolumeSnapshotter, error) {
if bs, ok := ib.snapshotLocationVolumeSnapshotters[snapshotLocation.Name]; ok {
return bs, nil
}
@ -394,7 +389,7 @@ const (
// takePVSnapshot triggers a snapshot for the volume/disk underlying a PersistentVolume if the provided
// backup has volume snapshots enabled and the PV is of a compatible type. Also records cloud
// disk type and IOPS (if applicable) to be able to restore to current state later.
func (ib *defaultItemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.FieldLogger) error {
func (ib *itemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.FieldLogger) error {
log.Info("Executing takePVSnapshot")
if boolptr.IsSetToFalse(ib.backupRequest.Spec.SnapshotVolumes) {

View File

@ -17,18 +17,17 @@ limitations under the License.
package backup
import (
"encoding/json"
"io/ioutil"
"sort"
"strings"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
@ -36,36 +35,49 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/collections"
)
// resourceBackupper collects resources from the Kubernetes API according to
// the backup spec and passes them to an itemBackupper to be backed up.
type resourceBackupper struct {
// itemCollector collects items from the Kubernetes API according to
// the backup spec and writes them to files inside dir.
type itemCollector struct {
log logrus.FieldLogger
backupRequest *Request
discoveryHelper discovery.Helper
dynamicFactory client.DynamicFactory
cohabitatingResources map[string]*cohabitatingResource
newItemBackupper func() ItemBackupper
dir string
}
// collect backs up all API groups.
func (r *resourceBackupper) backupAllGroups() {
type kubernetesResource struct {
groupResource schema.GroupResource
preferredGVR schema.GroupVersionResource
namespace, name, path string
}
// getAllItems gets all relevant items from all API groups.
func (r *itemCollector) getAllItems() []*kubernetesResource {
var resources []*kubernetesResource
for _, group := range r.discoveryHelper.Resources() {
if err := r.backupGroup(r.log, group); err != nil {
r.log.WithError(err).WithField("apiGroup", group.String()).Error("Error backing up API group")
groupItems, err := r.getGroupItems(r.log, group)
if err != nil {
r.log.WithError(err).WithField("apiGroup", group.String()).Error("Error collecting resources from API group")
continue
}
resources = append(resources, groupItems...)
}
return resources
}
// backupGroup backs up a single API group.
func (r *resourceBackupper) backupGroup(log logrus.FieldLogger, group *metav1.APIResourceList) error {
// getGroupItems collects all relevant items from a single API group.
func (r *itemCollector) getGroupItems(log logrus.FieldLogger, group *metav1.APIResourceList) ([]*kubernetesResource, error) {
log = log.WithField("group", group.GroupVersion)
log.Infof("Backing up group")
log.Infof("Getting items for group")
// Parse so we can check if this is the core group
gv, err := schema.ParseGroupVersion(group.GroupVersion)
if err != nil {
return errors.Wrapf(err, "error parsing GroupVersion %q", group.GroupVersion)
return nil, errors.Wrapf(err, "error parsing GroupVersion %q", group.GroupVersion)
}
if gv.Group == "" {
// This is the core group, so make sure we process in the following order: pods, pvcs, pvs,
@ -73,35 +85,38 @@ func (r *resourceBackupper) backupGroup(log logrus.FieldLogger, group *metav1.AP
sortCoreGroup(group)
}
var items []*kubernetesResource
for _, resource := range group.APIResources {
if err := r.backupResource(log, group, resource); err != nil {
log.WithError(err).WithField("resource", resource.String()).Error("Error backing up API resource")
resourceItems, err := r.getResourceItems(log, gv, resource)
if err != nil {
log.WithError(err).WithField("resource", resource.String()).Error("Error getting items for resource")
continue
}
items = append(items, resourceItems...)
}
return nil
return items, nil
}
// backupResource backs up all the objects for a given group-version-resource.
func (r *resourceBackupper) backupResource(log logrus.FieldLogger, group *metav1.APIResourceList, resource metav1.APIResource) error {
// getResourceItems collects all relevant items for a given group-version-resource.
func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.GroupVersion, resource metav1.APIResource) ([]*kubernetesResource, error) {
log = log.WithField("resource", resource.Name)
log.Info("Backing up resource")
log.Info("Getting items for resource")
gv, err := schema.ParseGroupVersion(group.GroupVersion)
if err != nil {
return errors.Wrapf(err, "error parsing GroupVersion %s", group.GroupVersion)
}
gr := schema.GroupResource{Group: gv.Group, Resource: resource.Name}
var (
gvr = gv.WithResource(resource.Name)
gr = gvr.GroupResource()
clusterScoped = !resource.Namespaced
)
// Getting the preferred group version of this resource
preferredGVR, _, err := r.discoveryHelper.ResourceFor(gr.WithVersion(""))
if err != nil {
return errors.WithStack(err)
return nil, errors.WithStack(err)
}
clusterScoped := !resource.Namespaced
// If the resource we are backing up is NOT namespaces, and it is cluster-scoped, check to see if
// we should include it based on the IncludeClusterResources setting.
if gr != kuberesource.Namespaces && clusterScoped {
@ -115,17 +130,17 @@ func (r *resourceBackupper) backupResource(log logrus.FieldLogger, group *metav1
// If we're processing namespaces themselves, we will not skip here, they may be
// filtered out later.
log.Info("Skipping resource because it's cluster-scoped and only specific namespaces are included in the backup")
return nil
return nil, nil
}
} else if !*r.backupRequest.Spec.IncludeClusterResources {
log.Info("Skipping resource because it's cluster-scoped")
return nil
return nil, nil
}
}
if !r.backupRequest.ResourceIncludesExcludes.ShouldInclude(gr.String()) {
log.Infof("Skipping resource because it's excluded")
return nil
return nil, nil
}
if cohabitator, found := r.cohabitatingResources[resource.Name]; found {
@ -136,13 +151,11 @@ func (r *resourceBackupper) backupResource(log logrus.FieldLogger, group *metav1
"cohabitatingResource2": cohabitator.groupResource2.String(),
},
).Infof("Skipping resource because it cohabitates and we've already processed it")
return nil
return nil, nil
}
cohabitator.seen = true
}
itemBackupper := r.newItemBackupper()
namespacesToList := getNamespacesToList(r.backupRequest.NamespaceIncludesExcludes)
// Check if we're backing up namespaces, and only certain ones
@ -156,10 +169,11 @@ func (r *resourceBackupper) backupResource(log logrus.FieldLogger, group *metav1
labelSelector, err = metav1.LabelSelectorAsSelector(r.backupRequest.Spec.LabelSelector)
if err != nil {
// This should never happen...
return errors.Wrap(err, "invalid label selector")
return nil, errors.Wrap(err, "invalid label selector")
}
}
var items []*kubernetesResource
for _, ns := range namespacesToList {
log = log.WithField("namespace", ns)
log.Info("Getting namespace")
@ -175,12 +189,21 @@ func (r *resourceBackupper) backupResource(log logrus.FieldLogger, group *metav1
continue
}
if _, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR); err != nil {
log.WithError(errors.WithStack(err)).Error("Error backing up namespace")
path, err := r.writeToFile(unstructured)
if err != nil {
log.WithError(err).Error("Error writing item to file")
continue
}
items = append(items, &kubernetesResource{
groupResource: gr,
preferredGVR: preferredGVR,
name: ns,
path: path,
})
}
return nil
return items, nil
}
}
@ -189,7 +212,8 @@ func (r *resourceBackupper) backupResource(log logrus.FieldLogger, group *metav1
namespacesToList = []string{""}
}
backedUpItem := false
var items []*kubernetesResource
for _, namespace := range namespacesToList {
log = log.WithField("namespace", namespace)
@ -212,101 +236,55 @@ func (r *resourceBackupper) backupResource(log logrus.FieldLogger, group *metav1
}
log.Infof("Retrieved %d items", len(unstructuredList.Items))
// do the backup
for _, item := range unstructuredList.Items {
if r.backupItem(log, gr, itemBackupper, &item, preferredGVR) {
backedUpItem = true
// collect the items
for i := range unstructuredList.Items {
item := &unstructuredList.Items[i]
if gr == kuberesource.Namespaces && !r.backupRequest.NamespaceIncludesExcludes.ShouldInclude(item.GetName()) {
log.WithField("name", item.GetName()).Info("Skipping namespace because it's excluded")
continue
}
path, err := r.writeToFile(item)
if err != nil {
log.WithError(err).Error("Error writing item to file")
continue
}
items = append(items, &kubernetesResource{
groupResource: gr,
preferredGVR: preferredGVR,
namespace: item.GetNamespace(),
name: item.GetName(),
path: path,
})
}
}
// back up CRD for resource if found. We should only need to do this if we've backed up at least
// one item and IncludeClusterResources is nil. If IncludeClusterResources is false
// we don't want to back it up, and if it's true it will already be included.
if backedUpItem && r.backupRequest.Spec.IncludeClusterResources == nil {
r.backupCRD(log, gr, itemBackupper)
}
return nil
return items, nil
}
func (r *resourceBackupper) backupItem(
log logrus.FieldLogger,
gr schema.GroupResource,
itemBackupper ItemBackupper,
unstructured runtime.Unstructured,
preferredGVR schema.GroupVersionResource,
) bool {
metadata, err := meta.Accessor(unstructured)
func (r *itemCollector) writeToFile(item *unstructured.Unstructured) (string, error) {
f, err := ioutil.TempFile(r.dir, "")
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error getting a metadata accessor")
return false
return "", errors.Wrap(err, "error creating temp file")
}
defer f.Close()
log = log.WithFields(map[string]interface{}{
"namespace": metadata.GetNamespace(),
"name": metadata.GetName(),
})
if gr == kuberesource.Namespaces && !r.backupRequest.NamespaceIncludesExcludes.ShouldInclude(metadata.GetName()) {
log.Info("Skipping namespace because it's excluded")
return false
}
backedUpItem, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR)
if aggregate, ok := err.(kubeerrs.Aggregate); ok {
log.Infof("%d errors encountered backup up item", len(aggregate.Errors()))
// log each error separately so we get error location info in the log, and an
// accurate count of errors
for _, err = range aggregate.Errors() {
log.WithError(err).Error("Error backing up item")
}
return false
}
jsonBytes, err := json.Marshal(item)
if err != nil {
log.WithError(err).Error("Error backing up item")
return false
return "", errors.Wrap(err, "error converting item to JSON")
}
return backedUpItem
}
// backupCRD checks if the resource is a custom resource, and if so, backs up the custom resource definition
// associated with it.
func (r *resourceBackupper) backupCRD(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper ItemBackupper) {
crdGroupResource := kuberesource.CustomResourceDefinitions
log.Debugf("Getting server preferred API version for %s", crdGroupResource)
gvr, apiResource, err := r.discoveryHelper.ResourceFor(crdGroupResource.WithVersion(""))
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Error getting resolved resource for %s", crdGroupResource)
return
if _, err := f.Write(jsonBytes); err != nil {
return "", errors.Wrap(err, "error writing JSON to file")
}
log.Debugf("Got server preferred API version %s for %s", gvr.Version, crdGroupResource)
log.Debugf("Getting dynamic client for %s", gvr.String())
crdClient, err := r.dynamicFactory.ClientForGroupVersionResource(gvr.GroupVersion(), apiResource, "")
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Error getting dynamic client for %s", crdGroupResource)
return
if err := f.Close(); err != nil {
return "", errors.Wrap(err, "error closing file")
}
log.Debugf("Got dynamic client for %s", gvr.String())
// try to get a CRD whose name matches the provided GroupResource
unstructured, err := crdClient.Get(gr.String(), metav1.GetOptions{})
if apierrors.IsNotFound(err) {
// not found: this means the GroupResource provided was not a
// custom resource, so there's no CRD to back up.
log.Debugf("No CRD found for GroupResource %s", gr.String())
return
}
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Error getting CRD %s", gr.String())
return
}
log.Infof("Found associated CRD %s to add to backup", gr.String())
r.backupItem(log, gvr.GroupResource(), itemBackupper, unstructured, gvr)
return f.Name(), nil
}
// sortCoreGroup sorts the core API group.