BackupController: do as much as possible
When running a backup, try to do as much as possible, collecting errors along the way, and return an aggregate at the end. This way, if a backup fails for most reasons, we'll be able to upload the backup log file to object storage, which wasn't happening before. Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>pull/250/head
parent
0045bb057d
commit
1e581f1ead
|
@ -116,32 +116,63 @@ func NewBackupService(objectStore ObjectStore, logger logrus.FieldLogger) Backup
|
|||
}
|
||||
}
|
||||
|
||||
func seekToBeginning(r io.Reader) error {
|
||||
seeker, ok := r.(io.Seeker)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := seeker.Seek(0, 0)
|
||||
return err
|
||||
}
|
||||
|
||||
func (br *backupService) seekAndPutObject(bucket, key string, file io.Reader) error {
|
||||
if file == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := seekToBeginning(file); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return br.objectStore.PutObject(bucket, key, file)
|
||||
}
|
||||
|
||||
func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.Reader) error {
|
||||
// upload metadata file
|
||||
metadataKey := getMetadataKey(backupName)
|
||||
if err := br.objectStore.PutObject(bucket, metadataKey, metadata); err != nil {
|
||||
// failure to upload metadata file is a hard-stop
|
||||
return err
|
||||
}
|
||||
|
||||
// upload tar file
|
||||
if err := br.objectStore.PutObject(bucket, getBackupContentsKey(backupName), backup); err != nil {
|
||||
// try to delete the metadata file since the data upload failed
|
||||
deleteErr := br.objectStore.DeleteObject(bucket, metadataKey)
|
||||
|
||||
return kerrors.NewAggregate([]error{err, deleteErr})
|
||||
}
|
||||
|
||||
// uploading log file is best-effort; if it fails, we log the error but call the overall upload a
|
||||
// success
|
||||
// Uploading the log file is best-effort; if it fails, we log the error but it doesn't impact the
|
||||
// backup's status.
|
||||
logKey := getBackupLogKey(backupName)
|
||||
if err := br.objectStore.PutObject(bucket, logKey, log); err != nil {
|
||||
if err := br.seekAndPutObject(bucket, logKey, log); err != nil {
|
||||
br.logger.WithError(err).WithFields(logrus.Fields{
|
||||
"bucket": bucket,
|
||||
"key": logKey,
|
||||
}).Error("Error uploading log file")
|
||||
}
|
||||
|
||||
if metadata == nil {
|
||||
// If we don't have metadata, something failed, and there's no point in continuing. An object
|
||||
// storage bucket that is missing the metadata file can't be restored, nor can its logs be
|
||||
// viewed.
|
||||
return nil
|
||||
}
|
||||
|
||||
// upload metadata file
|
||||
metadataKey := getMetadataKey(backupName)
|
||||
if err := br.seekAndPutObject(bucket, metadataKey, metadata); err != nil {
|
||||
// failure to upload metadata file is a hard-stop
|
||||
return err
|
||||
}
|
||||
|
||||
if backup != nil {
|
||||
// upload tar file
|
||||
if err := br.seekAndPutObject(bucket, getBackupContentsKey(backupName), backup); err != nil {
|
||||
// try to delete the metadata file since the data upload failed
|
||||
deleteErr := br.objectStore.DeleteObject(bucket, metadataKey)
|
||||
|
||||
return kerrors.NewAggregate([]error{err, deleteErr})
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -173,8 +204,8 @@ func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) {
|
|||
return output, nil
|
||||
}
|
||||
|
||||
func (br *backupService) GetBackup(bucket, name string) (*api.Backup, error) {
|
||||
key := fmt.Sprintf(metadataFileFormatString, name)
|
||||
func (br *backupService) GetBackup(bucket, backupName string) (*api.Backup, error) {
|
||||
key := getMetadataKey(backupName)
|
||||
|
||||
res, err := br.objectStore.GetObject(bucket, key)
|
||||
if err != nil {
|
||||
|
|
|
@ -46,36 +46,46 @@ func TestUploadBackup(t *testing.T) {
|
|||
expectMetadataDelete bool
|
||||
backup io.ReadSeeker
|
||||
backupError error
|
||||
expectBackupUpload bool
|
||||
log io.ReadSeeker
|
||||
logError error
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "normal case",
|
||||
metadata: newStringReadSeeker("foo"),
|
||||
backup: newStringReadSeeker("bar"),
|
||||
log: newStringReadSeeker("baz"),
|
||||
name: "normal case",
|
||||
metadata: newStringReadSeeker("foo"),
|
||||
backup: newStringReadSeeker("bar"),
|
||||
expectBackupUpload: true,
|
||||
log: newStringReadSeeker("baz"),
|
||||
},
|
||||
{
|
||||
name: "error on metadata upload does not upload data or log",
|
||||
name: "error on metadata upload does not upload data",
|
||||
metadata: newStringReadSeeker("foo"),
|
||||
metadataError: errors.New("md"),
|
||||
log: newStringReadSeeker("baz"),
|
||||
expectedErr: "md",
|
||||
},
|
||||
{
|
||||
name: "error on data upload deletes metadata",
|
||||
metadata: newStringReadSeeker("foo"),
|
||||
backup: newStringReadSeeker("bar"),
|
||||
expectBackupUpload: true,
|
||||
backupError: errors.New("backup"),
|
||||
expectMetadataDelete: true,
|
||||
expectedErr: "backup",
|
||||
},
|
||||
{
|
||||
name: "error on log upload is ok",
|
||||
metadata: newStringReadSeeker("foo"),
|
||||
backup: newStringReadSeeker("bar"),
|
||||
log: newStringReadSeeker("baz"),
|
||||
logError: errors.New("log"),
|
||||
name: "error on log upload is ok",
|
||||
metadata: newStringReadSeeker("foo"),
|
||||
backup: newStringReadSeeker("bar"),
|
||||
expectBackupUpload: true,
|
||||
log: newStringReadSeeker("baz"),
|
||||
logError: errors.New("log"),
|
||||
},
|
||||
{
|
||||
name: "don't upload data when metadata is nil",
|
||||
backup: newStringReadSeeker("bar"),
|
||||
log: newStringReadSeeker("baz"),
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -87,11 +97,12 @@ func TestUploadBackup(t *testing.T) {
|
|||
backupName = "test-backup"
|
||||
logger = arktest.NewLogger()
|
||||
)
|
||||
defer objStore.AssertExpectations(t)
|
||||
|
||||
if test.metadata != nil {
|
||||
objStore.On("PutObject", bucket, backupName+"/ark-backup.json", test.metadata).Return(test.metadataError)
|
||||
}
|
||||
if test.backup != nil {
|
||||
if test.backup != nil && test.expectBackupUpload {
|
||||
objStore.On("PutObject", bucket, backupName+"/"+backupName+".tar.gz", test.backup).Return(test.backupError)
|
||||
}
|
||||
if test.log != nil {
|
||||
|
@ -111,7 +122,6 @@ func TestUploadBackup(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
objStore.AssertExpectations(t)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -372,5 +382,5 @@ func newStringReadSeeker(s string) *stringReadSeeker {
|
|||
}
|
||||
|
||||
func (srs *stringReadSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||
panic("not implemented")
|
||||
return 0, nil
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
|
@ -32,7 +33,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
kuberrs "k8s.io/apimachinery/pkg/util/errors"
|
||||
kerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
@ -211,13 +212,14 @@ func (controller *backupController) processBackup(key string) error {
|
|||
return errors.Wrap(err, "error getting backup")
|
||||
}
|
||||
|
||||
// TODO I think this is now unnecessary. We only initially place
|
||||
// item with Phase = ("" | New) into the queue. Items will only get
|
||||
// re-queued if syncHandler returns an error, which will only
|
||||
// happen if there's an error updating Phase from its initial
|
||||
// state to something else. So any time it's re-queued it will
|
||||
// still have its initial state, which we've already confirmed
|
||||
// is ("" | New)
|
||||
// Double-check we have the correct phase. In the unlikely event that multiple controller
|
||||
// instances are running, it's possible for controller A to succeed in changing the phase to
|
||||
// InProgress, while controller B's attempt to patch the phase fails. When controller B
|
||||
// reprocesses the same backup, it will either show up as New (informer hasn't seen the update
|
||||
// yet) or as InProgress. In the former case, the patch attempt will fail again, until the
|
||||
// informer sees the update. In the latter case, after the informer has seen the update to
|
||||
// InProgress, we still need this check so we can return nil to indicate we've finished processing
|
||||
// this key (even though it was a no-op).
|
||||
switch backup.Status.Phase {
|
||||
case "", api.BackupPhaseNew:
|
||||
// only process new backups
|
||||
|
@ -317,39 +319,20 @@ func (controller *backupController) getValidationErrors(itm *api.Backup) []strin
|
|||
}
|
||||
|
||||
func (controller *backupController) runBackup(backup *api.Backup, bucket string) error {
|
||||
backupFile, err := ioutil.TempFile("", "")
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error creating temp file for Backup")
|
||||
}
|
||||
log := controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup))
|
||||
log.Info("Starting backup")
|
||||
|
||||
logFile, err := ioutil.TempFile("", "")
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error creating temp file for Backup log")
|
||||
return errors.Wrap(err, "error creating temp file for backup log")
|
||||
}
|
||||
defer closeAndRemoveFile(logFile, log)
|
||||
|
||||
defer func() {
|
||||
var errs []error
|
||||
// TODO should this be wrapped?
|
||||
errs = append(errs, err)
|
||||
|
||||
if err := backupFile.Close(); err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "error closing Backup temp file"))
|
||||
}
|
||||
|
||||
if err := os.Remove(backupFile.Name()); err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "error removing Backup temp file"))
|
||||
}
|
||||
|
||||
if err := logFile.Close(); err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "error closing Backup log temp file"))
|
||||
}
|
||||
|
||||
if err := os.Remove(logFile.Name()); err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "error removing Backup log temp file"))
|
||||
}
|
||||
|
||||
err = kuberrs.NewAggregate(errs)
|
||||
}()
|
||||
backupFile, err := ioutil.TempFile("", "")
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error creating temp file for backup")
|
||||
}
|
||||
defer closeAndRemoveFile(backupFile, log)
|
||||
|
||||
actions, err := controller.pluginManager.GetBackupItemActions(backup.Name)
|
||||
if err != nil {
|
||||
|
@ -357,30 +340,42 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string)
|
|||
}
|
||||
defer controller.pluginManager.CloseBackupItemActions(backup.Name)
|
||||
|
||||
controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)).Info("starting backup")
|
||||
var errs []error
|
||||
|
||||
var backupJsonToUpload, backupFileToUpload io.Reader
|
||||
|
||||
// Do the actual backup
|
||||
if err := controller.backupper.Backup(backup, backupFile, logFile, actions); err != nil {
|
||||
return err
|
||||
errs = append(errs, err)
|
||||
|
||||
backup.Status.Phase = api.BackupPhaseFailed
|
||||
} else {
|
||||
backup.Status.Phase = api.BackupPhaseCompleted
|
||||
}
|
||||
|
||||
controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)).Info("backup completed")
|
||||
|
||||
// note: updating this here so the uploaded JSON shows "completed". If
|
||||
// the upload fails, we'll alter the phase in the calling func.
|
||||
backup.Status.Phase = api.BackupPhaseCompleted
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
if err := encode.EncodeTo(backup, "json", buf); err != nil {
|
||||
return errors.Wrap(err, "error encoding Backup")
|
||||
backupJson := new(bytes.Buffer)
|
||||
if err := encode.EncodeTo(backup, "json", backupJson); err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "error encoding backup"))
|
||||
} else {
|
||||
// Only upload the json and backup tarball if encoding to json succeeded.
|
||||
backupJsonToUpload = backupJson
|
||||
backupFileToUpload = backupFile
|
||||
}
|
||||
|
||||
// re-set the files' offset to 0 for reading
|
||||
if _, err = backupFile.Seek(0, 0); err != nil {
|
||||
return errors.Wrap(err, "error resetting Backup file offset")
|
||||
}
|
||||
if _, err = logFile.Seek(0, 0); err != nil {
|
||||
return errors.Wrap(err, "error resetting Backup log file offset")
|
||||
if err := controller.backupService.UploadBackup(bucket, backup.Name, backupJsonToUpload, backupFileToUpload, logFile); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
return controller.backupService.UploadBackup(bucket, backup.Name, buf, backupFile, logFile)
|
||||
log.Info("Backup completed")
|
||||
|
||||
return kerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) {
|
||||
if err := file.Close(); err != nil {
|
||||
log.WithError(err).WithField("file", file.Name()).Error("error closing file")
|
||||
}
|
||||
if err := os.Remove(file.Name()); err != nil {
|
||||
log.WithError(err).WithField("file", file.Name()).Error("error removing file")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue