create pkg/archive with functionality extracted from pkg/restore ()

* move pkg/restore's backup extractor to pkg/archive

Signed-off-by: Steve Kriss <krisss@vmware.com>
pull/1827/head
Steve Kriss 2019-08-28 19:03:01 -06:00 committed by KubeKween
parent 60f9898ca0
commit e210626a36
5 changed files with 347 additions and 121 deletions

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package restore
package archive
import (
"archive/tar"
@ -27,15 +27,22 @@ import (
"github.com/heptio/velero/pkg/util/filesystem"
)
// backupExtractor unzips/extracts a backup tarball to a local
// Extractor unzips/extracts a backup tarball to a local
// temp directory.
type backupExtractor struct {
log logrus.FieldLogger
fileSystem filesystem.Interface
type Extractor struct {
log logrus.FieldLogger
fs filesystem.Interface
}
// unzipAndExtractBackup extracts a reader on a gzipped tarball to a local temp directory
func (e *backupExtractor) unzipAndExtractBackup(src io.Reader) (string, error) {
func NewExtractor(log logrus.FieldLogger, fs filesystem.Interface) *Extractor {
return &Extractor{
log: log,
fs: fs,
}
}
// UnzipAndExtractBackup extracts a reader on a gzipped tarball to a local temp directory
func (e *Extractor) UnzipAndExtractBackup(src io.Reader) (string, error) {
gzr, err := gzip.NewReader(src)
if err != nil {
e.log.Infof("error creating gzip reader: %v", err)
@ -46,8 +53,8 @@ func (e *backupExtractor) unzipAndExtractBackup(src io.Reader) (string, error) {
return e.readBackup(tar.NewReader(gzr))
}
func (e *backupExtractor) writeFile(target string, tarRdr *tar.Reader) error {
file, err := e.fileSystem.Create(target)
func (e *Extractor) writeFile(target string, tarRdr *tar.Reader) error {
file, err := e.fs.Create(target)
if err != nil {
return err
}
@ -59,8 +66,8 @@ func (e *backupExtractor) writeFile(target string, tarRdr *tar.Reader) error {
return nil
}
func (e *backupExtractor) readBackup(tarRdr *tar.Reader) (string, error) {
dir, err := e.fileSystem.TempDir("", "")
func (e *Extractor) readBackup(tarRdr *tar.Reader) (string, error) {
dir, err := e.fs.TempDir("", "")
if err != nil {
e.log.Infof("error creating temp dir: %v", err)
return "", err
@ -81,7 +88,7 @@ func (e *backupExtractor) readBackup(tarRdr *tar.Reader) (string, error) {
switch header.Typeflag {
case tar.TypeDir:
err := e.fileSystem.MkdirAll(target, header.FileInfo().Mode())
err := e.fs.MkdirAll(target, header.FileInfo().Mode())
if err != nil {
e.log.Infof("mkdirall error: %v", err)
return "", err
@ -89,7 +96,7 @@ func (e *backupExtractor) readBackup(tarRdr *tar.Reader) (string, error) {
case tar.TypeReg:
// make sure we have the directory created
err := e.fileSystem.MkdirAll(filepath.Dir(target), header.FileInfo().Mode())
err := e.fs.MkdirAll(filepath.Dir(target), header.FileInfo().Mode())
if err != nil {
e.log.Infof("mkdirall error: %v", err)
return "", err

167
pkg/archive/parser.go Normal file
View File

@ -0,0 +1,167 @@
/*
Copyright 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package archive
import (
"path/filepath"
"strings"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
velerov1api "github.com/heptio/velero/pkg/apis/velero/v1"
"github.com/heptio/velero/pkg/util/filesystem"
)
// Parser traverses an extracted archive on disk to validate
// it and provide a helpful representation of it to consumers.
type Parser struct {
log logrus.FieldLogger
fs filesystem.Interface
}
// ResourceItems contains the collection of items of a given resource type
// within a backup, grouped by namespace (or empty string for cluster-scoped
// resources).
type ResourceItems struct {
// GroupResource is API group and resource name,
// formatted as "resource.group". For the "core"
// API group, the ".group" suffix is omitted.
GroupResource string
// ItemsByNamespace is a map from namespace (or empty string
// for cluster-scoped resources) to a list of individual item
// names contained in the archive. Item names **do not** include
// the file extension.
ItemsByNamespace map[string][]string
}
// NewParser constructs a Parser.
func NewParser(log logrus.FieldLogger, fs filesystem.Interface) *Parser {
return &Parser{
log: log,
fs: fs,
}
}
// Parse reads an extracted backup on the file system and returns
// a structured catalog of the resources and items contained within it.
func (p *Parser) Parse(dir string) (map[string]*ResourceItems, error) {
// ensure top-level "resources" directory exists, and read subdirectories
// of it, where each one is expected to correspond to a resource.
resourcesDir := filepath.Join(dir, velerov1api.ResourcesDir)
exists, err := p.fs.DirExists(resourcesDir)
if err != nil {
return nil, errors.Wrapf(err, "error checking for existence of directory %q", strings.TrimPrefix(resourcesDir, dir+"/"))
}
if !exists {
return nil, errors.Errorf("directory %q does not exist", strings.TrimPrefix(resourcesDir, dir+"/"))
}
resourceDirs, err := p.fs.ReadDir(resourcesDir)
if err != nil {
return nil, errors.Wrapf(err, "error reading contents of directory %q", strings.TrimPrefix(resourcesDir, dir+"/"))
}
// loop through each subdirectory (one per resource) and assemble
// catalog of items within it.
resources := map[string]*ResourceItems{}
for _, resourceDir := range resourceDirs {
if !resourceDir.IsDir() {
p.log.Warnf("Ignoring unexpected file %q in directory %q", resourceDir.Name(), strings.TrimPrefix(resourcesDir, dir+"/"))
continue
}
resourceItems := &ResourceItems{
GroupResource: resourceDir.Name(),
ItemsByNamespace: map[string][]string{},
}
// check for existence of a "cluster" subdirectory containing cluster-scoped
// instances of this resource, and read its contents if it exists.
clusterScopedDir := filepath.Join(resourcesDir, resourceDir.Name(), velerov1api.ClusterScopedDir)
exists, err := p.fs.DirExists(clusterScopedDir)
if err != nil {
return nil, errors.Wrapf(err, "error checking for existence of directory %q", strings.TrimPrefix(clusterScopedDir, dir+"/"))
}
if exists {
items, err := p.getResourceItemsForScope(clusterScopedDir, dir)
if err != nil {
return nil, err
}
if len(items) > 0 {
resourceItems.ItemsByNamespace[""] = items
}
}
// check for existence of a "namespaces" subdirectory containing further subdirectories,
// one per namespace, and read its contents if it exists.
namespaceScopedDir := filepath.Join(resourcesDir, resourceDir.Name(), velerov1api.NamespaceScopedDir)
exists, err = p.fs.DirExists(namespaceScopedDir)
if err != nil {
return nil, errors.Wrapf(err, "error checking for existence of directory %q", strings.TrimPrefix(namespaceScopedDir, dir+"/"))
}
if exists {
namespaceDirs, err := p.fs.ReadDir(namespaceScopedDir)
if err != nil {
return nil, errors.Wrapf(err, "error reading contents of directory %q", strings.TrimPrefix(namespaceScopedDir, dir+"/"))
}
for _, namespaceDir := range namespaceDirs {
if !namespaceDir.IsDir() {
p.log.Warnf("Ignoring unexpected file %q in directory %q", namespaceDir.Name(), strings.TrimPrefix(namespaceScopedDir, dir+"/"))
continue
}
items, err := p.getResourceItemsForScope(filepath.Join(namespaceScopedDir, namespaceDir.Name()), dir)
if err != nil {
return nil, err
}
if len(items) > 0 {
resourceItems.ItemsByNamespace[namespaceDir.Name()] = items
}
}
}
resources[resourceDir.Name()] = resourceItems
}
return resources, nil
}
// getResourceItemsForScope returns the list of items with a namespace or
// cluster-scoped subdirectory for a specific resource.
func (p *Parser) getResourceItemsForScope(dir, archiveRootDir string) ([]string, error) {
files, err := p.fs.ReadDir(dir)
if err != nil {
return nil, errors.Wrapf(err, "error reading contents of directory %q", strings.TrimPrefix(dir, archiveRootDir+"/"))
}
var items []string
for _, file := range files {
if file.IsDir() {
p.log.Warnf("Ignoring unexpected subdirectory %q in directory %q", file.Name(), strings.TrimPrefix(dir, archiveRootDir+"/"))
continue
}
items = append(items, strings.TrimSuffix(file.Name(), ".json"))
}
return items, nil
}

120
pkg/archive/parser_test.go Normal file
View File

@ -0,0 +1,120 @@
/*
Copyright 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package archive
import (
"errors"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/heptio/velero/pkg/test"
)
func TestParse(t *testing.T) {
tests := []struct {
name string
files []string
dir string
wantErr error
want map[string]*ResourceItems
}{
{
name: "when there is no top-level resources directory, an error is returned",
dir: "root-dir",
wantErr: errors.New("directory \"resources\" does not exist"),
},
{
name: "when there are no directories under the resources directory, an empty map is returned",
dir: "root-dir",
files: []string{"root-dir/resources/"},
want: map[string]*ResourceItems{},
},
{
name: "a mix of cluster-scoped and namespaced items across multiple resources are correctly returned",
dir: "root-dir",
files: []string{
"root-dir/resources/widgets.foo/cluster/item-1.json",
"root-dir/resources/widgets.foo/cluster/item-2.json",
"root-dir/resources/widgets.foo/namespaces/ns-1/item-1.json",
"root-dir/resources/widgets.foo/namespaces/ns-1/item-2.json",
"root-dir/resources/widgets.foo/namespaces/ns-2/item-1.json",
"root-dir/resources/widgets.foo/namespaces/ns-2/item-2.json",
"root-dir/resources/dongles.foo/cluster/item-3.json",
"root-dir/resources/dongles.foo/cluster/item-4.json",
"root-dir/resources/dongles.bar/namespaces/ns-3/item-3.json",
"root-dir/resources/dongles.bar/namespaces/ns-3/item-4.json",
"root-dir/resources/dongles.bar/namespaces/ns-4/item-5.json",
"root-dir/resources/dongles.bar/namespaces/ns-4/item-6.json",
},
want: map[string]*ResourceItems{
"widgets.foo": {
GroupResource: "widgets.foo",
ItemsByNamespace: map[string][]string{
"": {"item-1", "item-2"},
"ns-1": {"item-1", "item-2"},
"ns-2": {"item-1", "item-2"},
},
},
"dongles.foo": {
GroupResource: "dongles.foo",
ItemsByNamespace: map[string][]string{
"": {"item-3", "item-4"},
},
},
"dongles.bar": {
GroupResource: "dongles.bar",
ItemsByNamespace: map[string][]string{
"ns-3": {"item-3", "item-4"},
"ns-4": {"item-5", "item-6"},
},
},
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
p := &Parser{
log: test.NewLogger(),
fs: test.NewFakeFileSystem(),
}
for _, file := range tc.files {
require.NoError(t, p.fs.MkdirAll(file, 0755))
if !strings.HasSuffix(file, "/") {
res, err := p.fs.Create(file)
require.NoError(t, err)
require.NoError(t, res.Close())
}
}
res, err := p.Parse(tc.dir)
if tc.wantErr != nil {
assert.Equal(t, err.Error(), tc.wantErr.Error())
} else {
assert.Nil(t, err)
assert.Equal(t, tc.want, res)
}
})
}
}

View File

@ -22,7 +22,6 @@ import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
@ -45,6 +44,7 @@ import (
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
velerov1api "github.com/heptio/velero/pkg/apis/velero/v1"
"github.com/heptio/velero/pkg/archive"
"github.com/heptio/velero/pkg/client"
"github.com/heptio/velero/pkg/discovery"
listers "github.com/heptio/velero/pkg/generated/listers/velero/v1"
@ -272,14 +272,10 @@ func (kr *kubernetesRestorer) Restore(
volumeSnapshots: req.VolumeSnapshots,
podVolumeBackups: req.PodVolumeBackups,
resourceTerminatingTimeout: kr.resourceTerminatingTimeout,
extractor: &backupExtractor{
log: req.Log,
fileSystem: kr.fileSystem,
},
resourceClients: make(map[resourceClientKey]client.Dynamic),
restoredItems: make(map[velero.ResourceIdentifier]struct{}),
renamedPVs: make(map[string]string),
pvRenamer: kr.pvRenamer,
resourceClients: make(map[resourceClientKey]client.Dynamic),
restoredItems: make(map[velero.ResourceIdentifier]struct{}),
renamedPVs: make(map[string]string),
pvRenamer: kr.pvRenamer,
}
return restoreCtx.execute()
@ -368,7 +364,6 @@ type context struct {
volumeSnapshots []*volume.Snapshot
podVolumeBackups []*velerov1api.PodVolumeBackup
resourceTerminatingTimeout time.Duration
extractor *backupExtractor
resourceClients map[resourceClientKey]client.Dynamic
restoredItems map[velero.ResourceIdentifier]struct{}
renamedPVs map[string]string
@ -381,49 +376,26 @@ type resourceClientKey struct {
}
func (ctx *context) execute() (Result, Result) {
warnings, errs := Result{}, Result{}
ctx.log.Infof("Starting restore of backup %s", kube.NamespaceAndName(ctx.backup))
dir, err := ctx.extractor.unzipAndExtractBackup(ctx.backupReader)
dir, err := archive.NewExtractor(ctx.log, ctx.fileSystem).UnzipAndExtractBackup(ctx.backupReader)
if err != nil {
ctx.log.Infof("error unzipping and extracting: %v", err)
return Result{}, Result{Velero: []string{err.Error()}}
addVeleroError(&errs, err)
return warnings, errs
}
defer ctx.fileSystem.RemoveAll(dir)
// need to set this for additionalItems to be restored
ctx.restoreDir = dir
return ctx.restoreFromDir()
}
// restoreFromDir executes a restore based on backup data contained within a local
// directory, ctx.restoreDir.
func (ctx *context) restoreFromDir() (Result, Result) {
warnings, errs := Result{}, Result{}
// Make sure the top level "resources" dir exists:
resourcesDir := filepath.Join(ctx.restoreDir, velerov1api.ResourcesDir)
rde, err := ctx.fileSystem.DirExists(resourcesDir)
backupResources, err := archive.NewParser(ctx.log, ctx.fileSystem).Parse(ctx.restoreDir)
if err != nil {
addVeleroError(&errs, err)
addVeleroError(&errs, errors.Wrap(err, "error parsing backup contents"))
return warnings, errs
}
if !rde {
addVeleroError(&errs, errors.New("backup does not contain top level resources directory"))
return warnings, errs
}
resourceDirs, err := ctx.fileSystem.ReadDir(resourcesDir)
if err != nil {
addVeleroError(&errs, err)
return warnings, errs
}
resourceDirsMap := make(map[string]os.FileInfo)
for _, rscDir := range resourceDirs {
rscName := rscDir.Name()
resourceDirsMap[rscName] = rscDir
}
existingNamespaces := sets.NewString()
@ -434,67 +406,31 @@ func (ctx *context) restoreFromDir() (Result, Result) {
continue
}
rscDir := resourceDirsMap[resource.String()]
if rscDir == nil {
resourceList := backupResources[resource.String()]
if resourceList == nil {
continue
}
resourcePath := filepath.Join(resourcesDir, rscDir.Name())
clusterSubDir := filepath.Join(resourcePath, velerov1api.ClusterScopedDir)
clusterSubDirExists, err := ctx.fileSystem.DirExists(clusterSubDir)
if err != nil {
addVeleroError(&errs, err)
return warnings, errs
}
if clusterSubDirExists {
w, e := ctx.restoreResource(resource.String(), "", clusterSubDir)
merge(&warnings, &w)
merge(&errs, &e)
continue
}
nsSubDir := filepath.Join(resourcePath, velerov1api.NamespaceScopedDir)
nsSubDirExists, err := ctx.fileSystem.DirExists(nsSubDir)
if err != nil {
addVeleroError(&errs, err)
return warnings, errs
}
if !nsSubDirExists {
continue
}
nsDirs, err := ctx.fileSystem.ReadDir(nsSubDir)
if err != nil {
addVeleroError(&errs, err)
return warnings, errs
}
for _, nsDir := range nsDirs {
if !nsDir.IsDir() {
continue
}
nsName := nsDir.Name()
nsPath := filepath.Join(nsSubDir, nsName)
if !ctx.namespaceIncludesExcludes.ShouldInclude(nsName) {
ctx.log.Infof("Skipping namespace %s", nsName)
for namespace, items := range resourceList.ItemsByNamespace {
if namespace != "" && !ctx.namespaceIncludesExcludes.ShouldInclude(namespace) {
ctx.log.Infof("Skipping namespace %s", namespace)
continue
}
// fetch mapped NS name
mappedNsName := nsName
if target, ok := ctx.restore.Spec.NamespaceMapping[nsName]; ok {
mappedNsName = target
// get target namespace to restore into, if different
// from source namespace
targetNamespace := namespace
if target, ok := ctx.restore.Spec.NamespaceMapping[namespace]; ok {
targetNamespace = target
}
// if we don't know whether this namespace exists yet, attempt to create
// it in order to ensure it exists. Try to get it from the backup tarball
// (in order to get any backed-up metadata), but if we don't find it there,
// create a blank one.
if !existingNamespaces.Has(mappedNsName) {
logger := ctx.log.WithField("namespace", nsName)
ns := getNamespace(logger, getItemFilePath(ctx.restoreDir, "namespaces", "", nsName), mappedNsName)
if namespace != "" && !existingNamespaces.Has(targetNamespace) {
logger := ctx.log.WithField("namespace", namespace)
ns := getNamespace(logger, getItemFilePath(ctx.restoreDir, "namespaces", "", namespace), targetNamespace)
if _, err := kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil {
addVeleroError(&errs, err)
continue
@ -502,10 +438,10 @@ func (ctx *context) restoreFromDir() (Result, Result) {
// keep track of namespaces that we know exist so we don't
// have to try to create them multiple times
existingNamespaces.Insert(mappedNsName)
existingNamespaces.Insert(targetNamespace)
}
w, e := ctx.restoreResource(resource.String(), mappedNsName, nsPath)
w, e := ctx.restoreResource(resource.String(), targetNamespace, namespace, items)
merge(&warnings, &w)
merge(&errs, &e)
}
@ -719,36 +655,32 @@ func (ctx *context) shouldRestore(name string, pvClient client.Dynamic) (bool, e
// restoreResource restores the specified cluster or namespace scoped resource. If namespace is
// empty we are restoring a cluster level resource, otherwise into the specified namespace.
func (ctx *context) restoreResource(resource, namespace, resourcePath string) (Result, Result) {
func (ctx *context) restoreResource(resource, targetNamespace, originalNamespace string, items []string) (Result, Result) {
warnings, errs := Result{}, Result{}
if ctx.restore.Spec.IncludeClusterResources != nil && !*ctx.restore.Spec.IncludeClusterResources && namespace == "" {
if targetNamespace == "" && boolptr.IsSetToFalse(ctx.restore.Spec.IncludeClusterResources) {
ctx.log.Infof("Skipping resource %s because it's cluster-scoped", resource)
return warnings, errs
}
if namespace != "" {
ctx.log.Infof("Restoring resource '%s' into namespace '%s' from: %s", resource, namespace, resourcePath)
if targetNamespace != "" {
ctx.log.Infof("Restoring resource '%s' into namespace '%s'", resource, targetNamespace)
} else {
ctx.log.Infof("Restoring cluster level resource '%s' from: %s", resource, resourcePath)
ctx.log.Infof("Restoring cluster level resource '%s'", resource)
}
files, err := ctx.fileSystem.ReadDir(resourcePath)
if err != nil {
addToResult(&errs, namespace, fmt.Errorf("error reading %q resource directory: %v", resource, err))
return warnings, errs
}
if len(files) == 0 {
if len(items) == 0 {
return warnings, errs
}
groupResource := schema.ParseGroupResource(resource)
for _, file := range files {
fullPath := filepath.Join(resourcePath, file.Name())
obj, err := ctx.unmarshal(fullPath)
for _, item := range items {
itemPath := getItemFilePath(ctx.restoreDir, resource, originalNamespace, item)
obj, err := ctx.unmarshal(itemPath)
if err != nil {
addToResult(&errs, namespace, fmt.Errorf("error decoding %q: %v", strings.Replace(fullPath, ctx.restoreDir+"/", "", -1), err))
addToResult(&errs, targetNamespace, fmt.Errorf("error decoding %q: %v", strings.Replace(itemPath, ctx.restoreDir+"/", "", -1), err))
continue
}
@ -756,7 +688,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (R
continue
}
w, e := ctx.restoreItem(obj, groupResource, namespace)
w, e := ctx.restoreItem(obj, groupResource, targetNamespace)
merge(&warnings, &w)
merge(&errs, &e)
}

View File

@ -696,7 +696,7 @@ func TestInvalidTarballContents(t *testing.T) {
tarball: newTarWriter(t).
done(),
wantErrs: Result{
Velero: []string{"backup does not contain top level resources directory"},
Velero: []string{"error parsing backup contents: directory \"resources\" does not exist"},
},
},
{