during restore, try to get backup directly from obj storage if not in cache/API
Signed-off-by: Steve Kriss <steve@heptio.com>pull/57/head
parent
b20feee7f9
commit
f07a70c604
|
@ -25,6 +25,7 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/errors"
|
||||
|
||||
api "github.com/heptio/ark/pkg/apis/ark/v1"
|
||||
|
@ -46,6 +47,9 @@ type BackupService interface {
|
|||
|
||||
// DeleteBackup deletes the backup content in object storage for the given api.Backup.
|
||||
DeleteBackup(bucket, backupName string) error
|
||||
|
||||
// GetBackup gets the specified api.Backup from the given bucket in object storage.
|
||||
GetBackup(bucket, name string) (*api.Backup, error)
|
||||
}
|
||||
|
||||
// BackupGetter knows how to list backups in object storage.
|
||||
|
@ -61,6 +65,7 @@ const (
|
|||
|
||||
type backupService struct {
|
||||
objectStorage ObjectStorageAdapter
|
||||
decoder runtime.Decoder
|
||||
}
|
||||
|
||||
var _ BackupService = &backupService{}
|
||||
|
@ -70,6 +75,7 @@ var _ BackupGetter = &backupService{}
|
|||
func NewBackupService(objectStorage ObjectStorageAdapter) BackupService {
|
||||
return &backupService{
|
||||
objectStorage: objectStorage,
|
||||
decoder: scheme.Codecs.UniversalDecoder(api.SchemeGroupVersion),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -106,46 +112,45 @@ func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) {
|
|||
|
||||
output := make([]*api.Backup, 0, len(prefixes))
|
||||
|
||||
decoder := scheme.Codecs.UniversalDecoder(api.SchemeGroupVersion)
|
||||
|
||||
for _, backupDir := range prefixes {
|
||||
err := func() error {
|
||||
key := fmt.Sprintf(metadataFileFormatString, backupDir)
|
||||
|
||||
res, err := br.objectStorage.GetObject(bucket, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Close()
|
||||
|
||||
data, err := ioutil.ReadAll(res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
obj, _, err := decoder.Decode(data, nil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
backup, ok := obj.(*api.Backup)
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected type for %s/%s: %T", bucket, key, obj)
|
||||
}
|
||||
|
||||
output = append(output, backup)
|
||||
|
||||
return nil
|
||||
}()
|
||||
|
||||
backup, err := br.GetBackup(bucket, backupDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
output = append(output, backup)
|
||||
}
|
||||
|
||||
return output, nil
|
||||
}
|
||||
|
||||
func (br *backupService) GetBackup(bucket, name string) (*api.Backup, error) {
|
||||
key := fmt.Sprintf(metadataFileFormatString, name)
|
||||
|
||||
res, err := br.objectStorage.GetObject(bucket, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer res.Close()
|
||||
|
||||
data, err := ioutil.ReadAll(res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
obj, _, err := br.decoder.Decode(data, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
backup, ok := obj.(*api.Backup)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected type for %s/%s: %T", bucket, key, obj)
|
||||
}
|
||||
|
||||
return backup, nil
|
||||
}
|
||||
|
||||
func (br *backupService) DeleteBackup(bucket, backupName string) error {
|
||||
var errs []error
|
||||
|
||||
|
|
|
@ -319,6 +319,21 @@ func (s *fakeBackupService) GetAllBackups(bucket string) ([]*api.Backup, error)
|
|||
return backups, nil
|
||||
}
|
||||
|
||||
func (s *fakeBackupService) GetBackup(bucket, name string) (*api.Backup, error) {
|
||||
backups, err := s.GetAllBackups(bucket)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, itm := range backups {
|
||||
if itm.Name == name {
|
||||
return itm, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.New("backup not found")
|
||||
}
|
||||
|
||||
func (bs *fakeBackupService) UploadBackup(bucket, name string, metadata, backup io.ReadSeeker) error {
|
||||
args := bs.Called(bucket, name, metadata, backup)
|
||||
return args.Error(0)
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
@ -285,8 +286,38 @@ func (controller *restoreController) getValidationErrors(itm *api.Restore) []str
|
|||
return validationErrors
|
||||
}
|
||||
|
||||
func (controller *restoreController) fetchBackup(bucket, name string) (*api.Backup, error) {
|
||||
backup, err := controller.backupLister.Backups(api.DefaultNamespace).Get(name)
|
||||
if err == nil {
|
||||
return backup, nil
|
||||
}
|
||||
|
||||
if !apierrors.IsNotFound(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Backup %q not found in backupLister, checking object storage directly.", name)
|
||||
backup, err = controller.backupService.GetBackup(bucket, name)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("Backup %q not found in object storage.", name)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// ResourceVersion needs to be cleared in order to create the object in the API
|
||||
backup.ResourceVersion = ""
|
||||
|
||||
created, createErr := controller.backupClient.Backups(api.DefaultNamespace).Create(backup)
|
||||
if createErr != nil {
|
||||
glog.Errorf("Unable to create API object for backup %q: %v", name, createErr)
|
||||
} else {
|
||||
backup = created
|
||||
}
|
||||
|
||||
return backup, nil
|
||||
}
|
||||
|
||||
func (controller *restoreController) runRestore(restore *api.Restore, bucket string) (warnings, errors api.RestoreResult) {
|
||||
backup, err := controller.backupLister.Backups(api.DefaultNamespace).Get(restore.Spec.BackupName)
|
||||
backup, err := controller.fetchBackup(bucket, restore.Spec.BackupName)
|
||||
if err != nil {
|
||||
glog.Errorf("error getting backup: %v", err)
|
||||
errors.Cluster = append(errors.Ark, err.Error())
|
||||
|
|
|
@ -35,6 +35,71 @@ import (
|
|||
. "github.com/heptio/ark/pkg/util/test"
|
||||
)
|
||||
|
||||
func TestFetchRestore(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
backupName string
|
||||
informerBackups []*api.Backup
|
||||
backupSvcBackups map[string][]*api.Backup
|
||||
expectedRes *api.Backup
|
||||
expectedErr bool
|
||||
}{
|
||||
{
|
||||
name: "lister has backup",
|
||||
backupName: "backup-1",
|
||||
informerBackups: []*api.Backup{NewTestBackup().WithName("backup-1").Backup},
|
||||
expectedRes: NewTestBackup().WithName("backup-1").Backup,
|
||||
},
|
||||
{
|
||||
name: "backupSvc has backup",
|
||||
backupName: "backup-1",
|
||||
backupSvcBackups: map[string][]*api.Backup{
|
||||
"bucket": []*api.Backup{NewTestBackup().WithName("backup-1").Backup},
|
||||
},
|
||||
expectedRes: NewTestBackup().WithName("backup-1").Backup,
|
||||
},
|
||||
{
|
||||
name: "no backup",
|
||||
backupName: "backup-1",
|
||||
expectedErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
var (
|
||||
client = fake.NewSimpleClientset()
|
||||
restorer = &fakeRestorer{}
|
||||
sharedInformers = informers.NewSharedInformerFactory(client, 0)
|
||||
backupSvc = &fakeBackupService{}
|
||||
)
|
||||
|
||||
c := NewRestoreController(
|
||||
sharedInformers.Ark().V1().Restores(),
|
||||
client.ArkV1(),
|
||||
client.ArkV1(),
|
||||
restorer,
|
||||
backupSvc,
|
||||
"bucket",
|
||||
sharedInformers.Ark().V1().Backups(),
|
||||
false,
|
||||
).(*restoreController)
|
||||
|
||||
for _, itm := range test.informerBackups {
|
||||
sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(itm)
|
||||
}
|
||||
|
||||
backupSvc.backupsByBucket = test.backupSvcBackups
|
||||
|
||||
backup, err := c.fetchBackup("bucket", test.backupName)
|
||||
|
||||
if assert.Equal(t, test.expectedErr, err != nil) {
|
||||
assert.Equal(t, test.expectedRes, backup)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessRestore(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
@ -92,7 +157,9 @@ func TestProcessRestore(t *testing.T) {
|
|||
WithBackup("backup-1").
|
||||
WithRestorableNamespace("ns-1").
|
||||
WithErrors(api.RestoreResult{
|
||||
Cluster: []string{"backup.ark.heptio.com \"backup-1\" not found"},
|
||||
// TODO this is the error msg returned by the fakeBackupService. When we switch to a mock obj,
|
||||
// this will likely need to change.
|
||||
Cluster: []string{"bucket not found"},
|
||||
}).
|
||||
Restore,
|
||||
},
|
||||
|
|
|
@ -55,3 +55,17 @@ func (f *FakeBackupService) DeleteBackup(bucket, backupName string) error {
|
|||
args := f.Called(bucket, backupName)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (f *FakeBackupService) GetBackup(bucket, name string) (*v1.Backup, error) {
|
||||
var (
|
||||
args = f.Called(bucket, name)
|
||||
b = args.Get(0)
|
||||
backup *v1.Backup
|
||||
)
|
||||
|
||||
if b != nil {
|
||||
backup = b.(*v1.Backup)
|
||||
}
|
||||
|
||||
return backup, args.Error(1)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue