Save backup log file to object storage
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>pull/40/head
parent
b50a78cf7b
commit
9848a7a55b
|
@ -25,8 +25,6 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
|
@ -43,7 +41,7 @@ import (
|
|||
type Backupper interface {
|
||||
// Backup takes a backup using the specification in the api.Backup and writes backup data to the
|
||||
// given writers.
|
||||
Backup(backup *api.Backup, data io.Writer) error
|
||||
Backup(backup *api.Backup, data, log io.Writer) error
|
||||
}
|
||||
|
||||
// kubernetesBackupper implements Backupper.
|
||||
|
@ -98,29 +96,34 @@ func resolveActions(mapper meta.RESTMapper, actions map[string]Action) (map[sche
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
// getResourceIncludesExcludes takes the lists of resources to include and exclude from the
|
||||
// backup, uses the RESTMapper to resolve them to fully-qualified group-resource names, and returns
|
||||
// an IncludesExcludes list.
|
||||
func getResourceIncludesExcludes(mapper meta.RESTMapper, backup *api.Backup) *collections.IncludesExcludes {
|
||||
// resolveResources uses the RESTMapper to resolve resources to their fully-qualified group-resource
|
||||
// names. fn is invoked for each resolved resource. resolveResources returns a list of any resources that failed to resolve.
|
||||
func (ctx *backupContext) resolveResources(mapper meta.RESTMapper, resources []string, allowAll bool, fn func(string)) {
|
||||
for _, resource := range resources {
|
||||
if allowAll && resource == "*" {
|
||||
fn("*")
|
||||
return
|
||||
}
|
||||
gr, err := resolveGroupResource(mapper, resource)
|
||||
if err != nil {
|
||||
ctx.log("Unable to resolve resource %q: %v", resource, err)
|
||||
continue
|
||||
}
|
||||
fn(gr.String())
|
||||
}
|
||||
}
|
||||
|
||||
// getResourceIncludesExcludes takes the lists of resources to include and exclude, uses the
|
||||
// RESTMapper to resolve them to fully-qualified group-resource names, and returns an
|
||||
// IncludesExcludes list.
|
||||
func (ctx *backupContext) getResourceIncludesExcludes(mapper meta.RESTMapper, includes, excludes []string) *collections.IncludesExcludes {
|
||||
resources := collections.NewIncludesExcludes()
|
||||
|
||||
resolve := func(list []string, allowAll bool, f func(string)) {
|
||||
for _, resource := range list {
|
||||
if allowAll && resource == "*" {
|
||||
f("*")
|
||||
return
|
||||
}
|
||||
gr, err := resolveGroupResource(mapper, resource)
|
||||
if err != nil {
|
||||
glog.Errorf("unable to include resource %q in backup: %v", resource, err)
|
||||
continue
|
||||
}
|
||||
f(gr.String())
|
||||
}
|
||||
}
|
||||
ctx.resolveResources(mapper, includes, true, func(s string) { resources.Includes(s) })
|
||||
ctx.resolveResources(mapper, excludes, false, func(s string) { resources.Excludes(s) })
|
||||
|
||||
resolve(backup.Spec.IncludedResources, true, func(s string) { resources.Includes(s) })
|
||||
resolve(backup.Spec.ExcludedResources, false, func(s string) { resources.Excludes(s) })
|
||||
ctx.log("Including resources: %v", strings.Join(resources.GetIncludes(), ", "))
|
||||
ctx.log("Excluding resources: %v", strings.Join(resources.GetExcludes(), ", "))
|
||||
|
||||
return resources
|
||||
}
|
||||
|
@ -144,6 +147,7 @@ func getNamespaceIncludesExcludes(backup *api.Backup) *collections.IncludesExclu
|
|||
type backupContext struct {
|
||||
backup *api.Backup
|
||||
w tarWriter
|
||||
logger io.Writer
|
||||
namespaceIncludesExcludes *collections.IncludesExcludes
|
||||
resourceIncludesExcludes *collections.IncludesExcludes
|
||||
// deploymentsBackedUp marks whether we've seen and are backing up the deployments resource, from
|
||||
|
@ -156,32 +160,52 @@ type backupContext struct {
|
|||
networkPoliciesBackedUp bool
|
||||
}
|
||||
|
||||
func (ctx *backupContext) log(msg string, args ...interface{}) {
|
||||
// TODO use a real logger that supports writing to files
|
||||
now := time.Now().Format(time.RFC3339)
|
||||
fmt.Fprintf(ctx.logger, now+" "+msg+"\n", args...)
|
||||
}
|
||||
|
||||
// Backup backs up the items specified in the Backup, placing them in a gzip-compressed tar file
|
||||
// written to data. The finalized api.Backup is written to metadata.
|
||||
func (kb *kubernetesBackupper) Backup(backup *api.Backup, data io.Writer) error {
|
||||
gzw := gzip.NewWriter(data)
|
||||
defer gzw.Close()
|
||||
func (kb *kubernetesBackupper) Backup(backup *api.Backup, data, log io.Writer) error {
|
||||
gzippedData := gzip.NewWriter(data)
|
||||
defer gzippedData.Close()
|
||||
|
||||
tw := tar.NewWriter(gzw)
|
||||
tw := tar.NewWriter(gzippedData)
|
||||
defer tw.Close()
|
||||
|
||||
gzippedLog := gzip.NewWriter(log)
|
||||
defer gzippedLog.Close()
|
||||
|
||||
var errs []error
|
||||
|
||||
ctx := &backupContext{
|
||||
backup: backup,
|
||||
w: tw,
|
||||
logger: gzippedLog,
|
||||
namespaceIncludesExcludes: getNamespaceIncludesExcludes(backup),
|
||||
resourceIncludesExcludes: getResourceIncludesExcludes(kb.discoveryHelper.Mapper(), backup),
|
||||
}
|
||||
|
||||
ctx.log("Starting backup")
|
||||
|
||||
ctx.resourceIncludesExcludes = ctx.getResourceIncludesExcludes(kb.discoveryHelper.Mapper(), backup.Spec.IncludedResources, backup.Spec.ExcludedResources)
|
||||
|
||||
for _, group := range kb.discoveryHelper.Resources() {
|
||||
glog.V(2).Infof("Backing up group %q\n", group.GroupVersion)
|
||||
ctx.log("Processing group %s", group.GroupVersion)
|
||||
if err := kb.backupGroup(ctx, group); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
return kuberrs.NewAggregate(errs)
|
||||
err := kuberrs.NewAggregate(errs)
|
||||
if err == nil {
|
||||
ctx.log("Backup completed successfully")
|
||||
} else {
|
||||
ctx.log("Backup completed with errors: %v", err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
type tarWriter interface {
|
||||
|
@ -195,7 +219,7 @@ func (kb *kubernetesBackupper) backupGroup(ctx *backupContext, group *metav1.API
|
|||
var errs []error
|
||||
|
||||
for _, resource := range group.APIResources {
|
||||
glog.V(2).Infof("Backing up resource %s/%s\n", group.GroupVersion, resource.Name)
|
||||
ctx.log("Processing resource %s/%s", group.GroupVersion, resource.Name)
|
||||
if err := kb.backupResource(ctx, group, resource); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
@ -228,7 +252,7 @@ func (kb *kubernetesBackupper) backupResource(
|
|||
grString := gr.String()
|
||||
|
||||
if !ctx.resourceIncludesExcludes.ShouldInclude(grString) {
|
||||
glog.V(2).Infof("Not including resource %s\n", grString)
|
||||
ctx.log("Resource %s is excluded", grString)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -240,7 +264,7 @@ func (kb *kubernetesBackupper) backupResource(
|
|||
} else {
|
||||
other = appsDeploymentsResource
|
||||
}
|
||||
glog.V(4).Infof("Skipping resource %q because it's a duplicate of %q", grString, other)
|
||||
ctx.log("Skipping resource %q because it's a duplicate of %q", grString, other)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -255,7 +279,7 @@ func (kb *kubernetesBackupper) backupResource(
|
|||
} else {
|
||||
other = networkingNetworkPoliciesResource
|
||||
}
|
||||
glog.V(4).Infof("Skipping resource %q because it's a duplicate of %q", grString, other)
|
||||
ctx.log("Skipping resource %q because it's a duplicate of %q", grString, other)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -357,20 +381,20 @@ func (*realItemBackupper) backupItem(ctx *backupContext, item map[string]interfa
|
|||
namespace, err := collections.GetString(metadata, "namespace")
|
||||
if err == nil {
|
||||
if !ctx.namespaceIncludesExcludes.ShouldInclude(namespace) {
|
||||
glog.V(2).Infof("Excluding item %s because namespace %s is excluded\n", name, namespace)
|
||||
ctx.log("Excluding item %s because namespace %s is excluded", name, namespace)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if action != nil {
|
||||
glog.V(4).Infof("Executing action on %s, ns=%s, name=%s", groupResource, namespace, name)
|
||||
ctx.log("Executing action on %s, ns=%s, name=%s", groupResource, namespace, name)
|
||||
|
||||
if err := action.Execute(item, ctx.backup); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Backing up resource=%s, ns=%s, name=%s", groupResource, namespace, name)
|
||||
ctx.log("Backing up resource=%s, ns=%s, name=%s", groupResource, namespace, name)
|
||||
|
||||
var filePath string
|
||||
if namespace != "" {
|
||||
|
|
|
@ -164,6 +164,13 @@ func TestGetResourceIncludesExcludes(t *testing.T) {
|
|||
expectedIncludes: []string{"foodies.somegroup", "fields.somegroup"},
|
||||
expectedExcludes: []string{"barnacles.anothergroup", "bazaars.anothergroup"},
|
||||
},
|
||||
{
|
||||
name: "some unresolvable",
|
||||
includes: []string{"foo", "fie", "bad1"},
|
||||
excludes: []string{"bar", "baz", "bad2"},
|
||||
expectedIncludes: []string{"foodies.somegroup", "fields.somegroup"},
|
||||
expectedExcludes: []string{"barnacles.anothergroup", "bazaars.anothergroup"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
@ -177,14 +184,12 @@ func TestGetResourceIncludesExcludes(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
backup := &v1.Backup{
|
||||
Spec: v1.BackupSpec{
|
||||
IncludedResources: test.includes,
|
||||
ExcludedResources: test.excludes,
|
||||
},
|
||||
b := new(bytes.Buffer)
|
||||
ctx := &backupContext{
|
||||
logger: b,
|
||||
}
|
||||
|
||||
actual := getResourceIncludesExcludes(mapper, backup)
|
||||
actual := ctx.getResourceIncludesExcludes(mapper, test.includes, test.excludes)
|
||||
|
||||
sort.Strings(test.expectedIncludes)
|
||||
actualIncludes := actual.GetIncludes()
|
||||
|
@ -439,7 +444,8 @@ func TestBackupMethod(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
output := new(bytes.Buffer)
|
||||
err = backupper.Backup(backup, output)
|
||||
log := new(bytes.Buffer)
|
||||
err = backupper.Backup(backup, output, log)
|
||||
require.NoError(t, err)
|
||||
|
||||
expectedFiles := sets.NewString(
|
||||
|
@ -775,6 +781,7 @@ func TestBackupResource(t *testing.T) {
|
|||
namespaceIncludesExcludes: test.namespaceIncludesExcludes,
|
||||
deploymentsBackedUp: test.deploymentsBackedUp,
|
||||
networkPoliciesBackedUp: test.networkPoliciesBackedUp,
|
||||
logger: new(bytes.Buffer),
|
||||
}
|
||||
|
||||
group := &metav1.APIResourceList{
|
||||
|
@ -1001,7 +1008,8 @@ func TestBackupItem(t *testing.T) {
|
|||
ctx := &backupContext{
|
||||
backup: backup,
|
||||
namespaceIncludesExcludes: namespaces,
|
||||
w: w,
|
||||
w: w,
|
||||
logger: new(bytes.Buffer),
|
||||
}
|
||||
b := &realItemBackupper{}
|
||||
err = b.backupItem(ctx, item, "resource.group", actionParam)
|
||||
|
|
|
@ -38,7 +38,7 @@ type BackupService interface {
|
|||
// UploadBackup uploads the specified Ark backup of a set of Kubernetes API objects, whose manifests are
|
||||
// stored in the specified file, into object storage in an Ark bucket, tagged with Ark metadata. Returns
|
||||
// an error if a problem is encountered accessing the file or performing the upload via the cloud API.
|
||||
UploadBackup(bucket, name string, metadata, backup io.ReadSeeker) error
|
||||
UploadBackup(bucket, name string, metadata, backup, log io.ReadSeeker) error
|
||||
|
||||
// DownloadBackup downloads an Ark backup with the specified object key from object storage via the cloud API.
|
||||
// It returns the snapshot metadata and data (separately), or an error if a problem is encountered
|
||||
|
@ -62,8 +62,9 @@ type BackupGetter interface {
|
|||
}
|
||||
|
||||
const (
|
||||
metadataFileFormatString string = "%s/ark-backup.json"
|
||||
backupFileFormatString string = "%s/%s.tar.gz"
|
||||
metadataFileFormatString = "%s/ark-backup.json"
|
||||
backupFileFormatString = "%s/%s.tar.gz"
|
||||
logFileFormatString = "%s/%s.log.gz"
|
||||
)
|
||||
|
||||
type backupService struct {
|
||||
|
@ -82,21 +83,30 @@ func NewBackupService(objectStorage ObjectStorageAdapter) BackupService {
|
|||
}
|
||||
}
|
||||
|
||||
func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup io.ReadSeeker) error {
|
||||
func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.ReadSeeker) error {
|
||||
// upload metadata file
|
||||
metadataKey := fmt.Sprintf(metadataFileFormatString, backupName)
|
||||
if err := br.objectStorage.PutObject(bucket, metadataKey, metadata); err != nil {
|
||||
// failure to upload metadata file is a hard-stop
|
||||
return err
|
||||
}
|
||||
|
||||
// upload tar file
|
||||
if err := br.objectStorage.PutObject(bucket, fmt.Sprintf(backupFileFormatString, backupName, backupName), backup); err != nil {
|
||||
backupKey := fmt.Sprintf(backupFileFormatString, backupName, backupName)
|
||||
if err := br.objectStorage.PutObject(bucket, backupKey, backup); err != nil {
|
||||
// try to delete the metadata file since the data upload failed
|
||||
deleteErr := br.objectStorage.DeleteObject(bucket, metadataKey)
|
||||
|
||||
return errors.NewAggregate([]error{err, deleteErr})
|
||||
}
|
||||
|
||||
// uploading log file is best-effort; if it fails, we log the error but call the overall upload a
|
||||
// success
|
||||
logKey := fmt.Sprintf(logFileFormatString, backupName, backupName)
|
||||
if err := br.objectStorage.PutObject(bucket, logKey, log); err != nil {
|
||||
glog.Errorf("error uploading %s/%s: %v", bucket, logKey, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ func TestUploadBackup(t *testing.T) {
|
|||
backupName string
|
||||
metadata io.ReadSeeker
|
||||
backup io.ReadSeeker
|
||||
log io.ReadSeeker
|
||||
objectStoreErrs map[string]map[string]interface{}
|
||||
expectedErr bool
|
||||
expectedRes map[string][]byte
|
||||
|
@ -54,10 +55,12 @@ func TestUploadBackup(t *testing.T) {
|
|||
backupName: "test-backup",
|
||||
metadata: newStringReadSeeker("foo"),
|
||||
backup: newStringReadSeeker("bar"),
|
||||
log: newStringReadSeeker("baz"),
|
||||
expectedErr: false,
|
||||
expectedRes: map[string][]byte{
|
||||
"test-backup/ark-backup.json": []byte("foo"),
|
||||
"test-backup/test-backup.tar.gz": []byte("bar"),
|
||||
"test-backup/test-backup.log.gz": []byte("baz"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -68,12 +71,13 @@ func TestUploadBackup(t *testing.T) {
|
|||
expectedErr: true,
|
||||
},
|
||||
{
|
||||
name: "error on metadata upload does not upload data",
|
||||
name: "error on metadata upload does not upload data or log",
|
||||
bucket: "test-bucket",
|
||||
bucketExists: true,
|
||||
backupName: "test-backup",
|
||||
metadata: newStringReadSeeker("foo"),
|
||||
backup: newStringReadSeeker("bar"),
|
||||
log: newStringReadSeeker("baz"),
|
||||
objectStoreErrs: map[string]map[string]interface{}{
|
||||
"putobject": map[string]interface{}{
|
||||
"test-bucket||test-backup/ark-backup.json": true,
|
||||
|
@ -89,6 +93,7 @@ func TestUploadBackup(t *testing.T) {
|
|||
backupName: "test-backup",
|
||||
metadata: newStringReadSeeker("foo"),
|
||||
backup: newStringReadSeeker("bar"),
|
||||
log: newStringReadSeeker("baz"),
|
||||
objectStoreErrs: map[string]map[string]interface{}{
|
||||
"putobject": map[string]interface{}{
|
||||
"test-bucket||test-backup/test-backup.tar.gz": true,
|
||||
|
@ -97,6 +102,25 @@ func TestUploadBackup(t *testing.T) {
|
|||
expectedErr: true,
|
||||
expectedRes: make(map[string][]byte),
|
||||
},
|
||||
{
|
||||
name: "error on log upload is ok",
|
||||
bucket: "test-bucket",
|
||||
bucketExists: true,
|
||||
backupName: "test-backup",
|
||||
metadata: newStringReadSeeker("foo"),
|
||||
backup: newStringReadSeeker("bar"),
|
||||
log: newStringReadSeeker("baz"),
|
||||
objectStoreErrs: map[string]map[string]interface{}{
|
||||
"putobject": map[string]interface{}{
|
||||
"test-bucket||test-backup/test-backup.log.gz": true,
|
||||
},
|
||||
},
|
||||
expectedErr: false,
|
||||
expectedRes: map[string][]byte{
|
||||
"test-backup/ark-backup.json": []byte("foo"),
|
||||
"test-backup/test-backup.tar.gz": []byte("bar"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
@ -111,7 +135,7 @@ func TestUploadBackup(t *testing.T) {
|
|||
|
||||
backupService := NewBackupService(objStore)
|
||||
|
||||
err := backupService.UploadBackup(test.bucket, test.backupName, test.metadata, test.backup)
|
||||
err := backupService.UploadBackup(test.bucket, test.backupName, test.metadata, test.backup, test.log)
|
||||
|
||||
assert.Equal(t, test.expectedErr, err != nil, "got error %v", err)
|
||||
|
||||
|
|
|
@ -312,21 +312,36 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string)
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logFile, err := ioutil.TempFile("", "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
var errs []error
|
||||
errs = append(errs, err)
|
||||
|
||||
if closeErr := backupFile.Close(); closeErr != nil {
|
||||
errs = append(errs, closeErr)
|
||||
if err := backupFile.Close(); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
if removeErr := os.Remove(backupFile.Name()); removeErr != nil {
|
||||
errs = append(errs, removeErr)
|
||||
if err := os.Remove(backupFile.Name()); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
if err := logFile.Close(); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
if err := os.Remove(logFile.Name()); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
err = kuberrs.NewAggregate(errs)
|
||||
}()
|
||||
|
||||
if err := controller.backupper.Backup(backup, backupFile); err != nil {
|
||||
if err := controller.backupper.Backup(backup, backupFile, logFile); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -340,11 +355,13 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string)
|
|||
return err
|
||||
}
|
||||
|
||||
// re-set the file offset to 0 for reading
|
||||
_, err = backupFile.Seek(0, 0)
|
||||
if err != nil {
|
||||
// re-set the files' offset to 0 for reading
|
||||
if _, err = backupFile.Seek(0, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = logFile.Seek(0, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return controller.backupService.UploadBackup(bucket, backup.Name, bytes.NewReader(buf.Bytes()), backupFile)
|
||||
return controller.backupService.UploadBackup(bucket, backup.Name, bytes.NewReader(buf.Bytes()), backupFile, logFile)
|
||||
}
|
||||
|
|
|
@ -40,8 +40,8 @@ type fakeBackupper struct {
|
|||
mock.Mock
|
||||
}
|
||||
|
||||
func (b *fakeBackupper) Backup(backup *v1.Backup, data io.Writer) error {
|
||||
args := b.Called(backup, data)
|
||||
func (b *fakeBackupper) Backup(backup *v1.Backup, data, log io.Writer) error {
|
||||
args := b.Called(backup, data, log)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
|
@ -200,9 +200,9 @@ func TestProcessBackup(t *testing.T) {
|
|||
backup.Status.Phase = v1.BackupPhaseInProgress
|
||||
backup.Status.Expiration.Time = expiration
|
||||
backup.Status.Version = 1
|
||||
backupper.On("Backup", backup, mock.Anything).Return(nil)
|
||||
backupper.On("Backup", backup, mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
cloudBackups.On("UploadBackup", "bucket", backup.Name, mock.Anything, mock.Anything).Return(nil)
|
||||
cloudBackups.On("UploadBackup", "bucket", backup.Name, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
}
|
||||
|
||||
// this is necessary so the Update() call returns the appropriate object
|
||||
|
|
|
@ -545,8 +545,8 @@ func (s *fakeBackupService) GetBackup(bucket, name string) (*api.Backup, error)
|
|||
return nil, errors.New("backup not found")
|
||||
}
|
||||
|
||||
func (bs *fakeBackupService) UploadBackup(bucket, name string, metadata, backup io.ReadSeeker) error {
|
||||
args := bs.Called(bucket, name, metadata, backup)
|
||||
func (bs *fakeBackupService) UploadBackup(bucket, name string, metadata, backup, log io.ReadSeeker) error {
|
||||
args := bs.Called(bucket, name, metadata, backup, log)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue