Add back code and tests

Signed-off-by: Carlisia <carlisiac@vmware.com>
pull/1390/head
Carlisia 2019-04-23 16:19:00 -07:00
parent c59d03dfb1
commit 38ccb40ca1
No known key found for this signature in database
GPG Key ID: EE2E6F4D2C4B7117
6 changed files with 330 additions and 172 deletions

View File

@ -37,9 +37,97 @@ const (
storageAccountConfigKey = "storageAccount"
)
type containerGetter interface {
getContainer(bucket string) (container, error)
}
type azureContainerGetter struct {
blobService *storage.BlobStorageClient
}
func (cg *azureContainerGetter) getContainer(bucket string) (container, error) {
container := cg.blobService.GetContainerReference(bucket)
if container == nil {
return nil, errors.Errorf("unable to get container reference for bucket %v", bucket)
}
return &azureContainer{
container: container,
}, nil
}
type container interface {
ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
}
type azureContainer struct {
container *storage.Container
}
func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
return c.container.ListBlobs(params)
}
type blobGetter interface {
getBlob(bucket, key string) (blob, error)
}
type azureBlobGetter struct {
blobService *storage.BlobStorageClient
}
func (bg *azureBlobGetter) getBlob(bucket, key string) (blob, error) {
container := bg.blobService.GetContainerReference(bucket)
if container == nil {
return nil, errors.Errorf("unable to get container reference for bucket %v", bucket)
}
blob := container.GetBlobReference(key)
if blob == nil {
return nil, errors.Errorf("unable to get blob reference for key %v", key)
}
return &azureBlob{
blob: blob,
}, nil
}
type blob interface {
CreateBlockBlobFromReader(blob io.Reader, options *storage.PutBlobOptions) error
Exists() (bool, error)
Get(options *storage.GetBlobOptions) (io.ReadCloser, error)
Delete(options *storage.DeleteBlobOptions) error
GetSASURI(options *storage.BlobSASOptions) (string, error)
}
type azureBlob struct {
blob *storage.Blob
}
func (b *azureBlob) CreateBlockBlobFromReader(blob io.Reader, options *storage.PutBlobOptions) error {
return b.blob.CreateBlockBlobFromReader(blob, options)
}
func (b *azureBlob) Exists() (bool, error) {
return b.blob.Exists()
}
func (b *azureBlob) Get(options *storage.GetBlobOptions) (io.ReadCloser, error) {
return b.blob.Get(options)
}
func (b *azureBlob) Delete(options *storage.DeleteBlobOptions) error {
return b.blob.Delete(options)
}
func (b *azureBlob) GetSASURI(options *storage.BlobSASOptions) (string, error) {
return b.blob.GetSASURI(*options)
}
type ObjectStore struct {
blobClient *storage.BlobStorageClient
log logrus.FieldLogger
containerGetter containerGetter
blobGetter blobGetter
log logrus.FieldLogger
}
func NewObjectStore(logger logrus.FieldLogger) *ObjectStore {
@ -122,18 +210,18 @@ func (o *ObjectStore) Init(config map[string]string) error {
}
blobClient := storageClient.GetBlobService()
o.blobClient = &blobClient
o.containerGetter = &azureContainerGetter{
blobService: &blobClient,
}
o.blobGetter = &azureBlobGetter{
blobService: &blobClient,
}
return nil
}
func (o *ObjectStore) PutObject(bucket, key string, body io.Reader) error {
container, err := getContainerReference(o.blobClient, bucket)
if err != nil {
return err
}
blob, err := getBlobReference(container, key)
blob, err := o.blobGetter.getBlob(bucket, key)
if err != nil {
return err
}
@ -142,12 +230,7 @@ func (o *ObjectStore) PutObject(bucket, key string, body io.Reader) error {
}
func (o *ObjectStore) ObjectExists(bucket, key string) (bool, error) {
container, err := getContainerReference(o.blobClient, bucket)
if err != nil {
return false, err
}
blob, err := getBlobReference(container, key)
blob, err := o.blobGetter.getBlob(bucket, key)
if err != nil {
return false, err
}
@ -161,12 +244,7 @@ func (o *ObjectStore) ObjectExists(bucket, key string) (bool, error) {
}
func (o *ObjectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
container, err := getContainerReference(o.blobClient, bucket)
if err != nil {
return nil, err
}
blob, err := getBlobReference(container, key)
blob, err := o.blobGetter.getBlob(bucket, key)
if err != nil {
return nil, err
}
@ -180,7 +258,7 @@ func (o *ObjectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
}
func (o *ObjectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) {
container, err := getContainerReference(o.blobClient, bucket)
container, err := o.containerGetter.getContainer(bucket)
if err != nil {
return nil, err
}
@ -199,7 +277,7 @@ func (o *ObjectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]st
}
func (o *ObjectStore) ListObjects(bucket, prefix string) ([]string, error) {
container, err := getContainerReference(o.blobClient, bucket)
container, err := o.containerGetter.getContainer(bucket)
if err != nil {
return nil, err
}
@ -222,12 +300,7 @@ func (o *ObjectStore) ListObjects(bucket, prefix string) ([]string, error) {
}
func (o *ObjectStore) DeleteObject(bucket string, key string) error {
container, err := getContainerReference(o.blobClient, bucket)
if err != nil {
return err
}
blob, err := getBlobReference(container, key)
blob, err := o.blobGetter.getBlob(bucket, key)
if err != nil {
return err
}
@ -236,12 +309,7 @@ func (o *ObjectStore) DeleteObject(bucket string, key string) error {
}
func (o *ObjectStore) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) {
container, err := getContainerReference(o.blobClient, bucket)
if err != nil {
return "", err
}
blob, err := getBlobReference(container, key)
blob, err := o.blobGetter.getBlob(bucket, key)
if err != nil {
return "", err
}
@ -255,23 +323,5 @@ func (o *ObjectStore) CreateSignedURL(bucket, key string, ttl time.Duration) (st
},
}
return blob.GetSASURI(opts)
}
func getContainerReference(blobClient *storage.BlobStorageClient, bucket string) (*storage.Container, error) {
container := blobClient.GetContainerReference(bucket)
if container == nil {
return nil, errors.Errorf("unable to get container reference for bucket %v", bucket)
}
return container, nil
}
func getBlobReference(container *storage.Container, key string) (*storage.Blob, error) {
blob := container.GetBlobReference(key)
if blob == nil {
return nil, errors.Errorf("unable to get blob reference for key %v", key)
}
return blob, nil
return blob.GetSASURI(&opts)
}

View File

@ -17,68 +17,16 @@ limitations under the License.
package azure
import (
"io"
"testing"
"github.com/Azure/azure-sdk-for-go/storage"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
// type mockBlobGetter struct {
// mock.Mock
// }
// func (m *mockBlobGetter) getBlob(bucket string, key string) (blob, error) {
// args := m.Called(bucket, key)
// return args.Get(0).(blob), args.Error(1)
// }
// type mockBlob struct {
// mock.Mock
// }
// func (m *mockBlob) CreateBlockBlobFromReader(blob io.Reader, options *storage.PutBlobOptions) error {
// args := m.Called(blob, options)
// return args.Error(0)
// }
// func (m *mockBlob) Exists() (bool, error) {
// args := m.Called()
// return args.Bool(0), args.Error(1)
// }
// func (m *mockBlob) Get(options *storage.GetBlobOptions) (io.ReadCloser, error) {
// args := m.Called(options)
// return args.Get(0).(io.ReadCloser), args.Error(1)
// }
// func (m *mockBlob) Delete(options *storage.DeleteBlobOptions) error {
// args := m.Called(options)
// return args.Error(0)
// }
// func (m *mockBlob) GetSASURI(expiry time.Time, permissions string) (string, error) {
// args := m.Called(expiry, permissions)
// return args.String(0), args.Error(1)
// }
// type mockContainerGetter struct {
// mock.Mock
// }
// func (m *mockContainerGetter) getContainer(bucket string) (container, error) {
// args := m.Called(bucket)
// return args.Get(0).(container), args.Error(1)
// }
// type mockContainer struct {
// mock.Mock
// }
// func (m *mockContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
// args := m.Called(params)
// return args.Get(0).(storage.BlobListResponse), args.Error(1)
// }
func TestObjectExists(t *testing.T) {
tests := []struct {
name string
@ -116,69 +64,89 @@ func TestObjectExists(t *testing.T) {
},
}
// for _, tc := range tests {
// t.Run(tc.name, func(t *testing.T) {
// blobGetter := new(mockBlobGetter)
// defer blobGetter.AssertExpectations(t)
// o := &objectStore{
// blobGetter: blobGetter,
// }
// bucket := "b"
// key := "k"
// blob := new(mockBlob)
// defer blob.AssertExpectations(t)
// blobGetter.On("getBlob", bucket, key).Return(blob, tc.getBlobError)
// blob.On("Exists").Return(tc.exists, tc.errorResponse)
// exists, err := o.ObjectExists(bucket, key)
// if tc.expectedError != "" {
// assert.EqualError(t, err, tc.expectedError)
// return
// }
// require.NoError(t, err)
// assert.Equal(t, tc.expectedExists, exists)
// })
// }
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// o := NewObjectStore(velerotest.NewLogger())
blobGetter := new(mockBlobGetter)
defer blobGetter.AssertExpectations(t)
// bucket := "b"
// key := "k"
o := &ObjectStore{
blobGetter: blobGetter,
}
// // container := new(mockContainer)
// // container.On("getContainerReference", nil, bucket)
bucket := "b"
key := "k"
// blob := new(mockBlob)
// defer blob.AssertExpectations(t)
// // blob.On("getBlobReference", container, key).Return(blob, tc.getBlobError)
// blob.On("Exists").Return(tc.exists, tc.errorResponse)
blob := new(mockBlob)
defer blob.AssertExpectations(t)
blobGetter.On("getBlob", bucket, key).Return(blob, tc.getBlobError)
// exists, err := o.ObjectExists(bucket, key)
blob.On("Exists").Return(tc.exists, tc.errorResponse)
// if tc.expectedError != "" {
// assert.EqualError(t, err, tc.expectedError)
// return
// }
// require.NoError(t, err)
exists, err := o.ObjectExists(bucket, key)
// assert.Equal(t, tc.expectedExists, exists)
if tc.expectedError != "" {
assert.EqualError(t, err, tc.expectedError)
return
}
require.NoError(t, err)
assert.Equal(t, tc.expectedExists, exists)
})
}
}
type mockBlobGetter struct {
mock.Mock
}
func (m *mockBlobGetter) getBlob(bucket string, key string) (blob, error) {
args := m.Called(bucket, key)
return args.Get(0).(blob), args.Error(1)
}
type mockBlob struct {
mock.Mock
}
func (m *mockBlob) CreateBlockBlobFromReader(blob io.Reader, options *storage.PutBlobOptions) error {
args := m.Called(blob, options)
return args.Error(0)
}
func (m *mockBlob) Exists() (bool, error) {
args := m.Called()
return args.Bool(0), args.Error(1)
}
func (m *mockBlob) Get(options *storage.GetBlobOptions) (io.ReadCloser, error) {
args := m.Called(options)
return args.Get(0).(io.ReadCloser), args.Error(1)
}
func (m *mockBlob) Delete(options *storage.DeleteBlobOptions) error {
args := m.Called(options)
return args.Error(0)
}
func (m *mockBlob) GetSASURI(options *storage.BlobSASOptions) (string, error) {
args := m.Called(options)
return args.String(0), args.Error(1)
}
type mockContainerGetter struct {
mock.Mock
}
func (m *mockContainerGetter) getContainer(bucket string) (container, error) {
args := m.Called(bucket)
return args.Get(0).(container), args.Error(1)
}
type mockContainer struct {
mock.Mock
}
func (m *mockContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
args := m.Called(params)
return args.Get(0).(storage.BlobListResponse), args.Error(1)
}

View File

@ -158,19 +158,22 @@ func (c *backupController) processBackup(key string) error {
log.Debug("Running processBackup")
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return errors.Wrap(err, "error splitting queue key")
log.WithError(err).Errorf("error splitting key")
return nil
}
log.Debug("Getting backup")
original, err := c.lister.Backups(ns).Get(name)
if apierrors.IsNotFound(err) {
log.Debug("backup not found")
log.Debugf("backup %s not found", name)
return nil
}
if err != nil {
return errors.Wrap(err, "error getting backup")
}
fmt.Println("original name is.....---- ", original.Name)
// Double-check we have the correct phase. In the unlikely event that multiple controller
// instances are running, it's possible for controller A to succeed in changing the phase to
// InProgress, while controller B's attempt to patch the phase fails. When controller B
@ -196,6 +199,9 @@ func (c *backupController) processBackup(key string) error {
request.Status.StartTimestamp.Time = c.clock.Now()
}
fmt.Println("request.Backup.Name name is.....---- ", request.Backup.Name)
fmt.Println("original name is.....---- ", original.Name)
// update status
updatedBackup, err := patchBackup(original, request.Backup, c.client)
if err != nil {
@ -284,11 +290,12 @@ func (c *backupController) prepareBackupRequest(backup *velerov1api.Backup) *pkg
}
request.Labels[velerov1api.StorageLocationLabel] = request.Spec.StorageLocation
// validate the included/excluded resources and namespaces
// validate the included/excluded resources
for _, err := range collections.ValidateIncludesExcludes(request.Spec.IncludedResources, request.Spec.ExcludedResources) {
request.Status.ValidationErrors = append(request.Status.ValidationErrors, fmt.Sprintf("Invalid included/excluded resource lists: %v", err))
}
// validate the included/excluded namespaces
for _, err := range collections.ValidateIncludesExcludes(request.Spec.IncludedNamespaces, request.Spec.ExcludedNamespaces) {
request.Status.ValidationErrors = append(request.Status.ValidationErrors, fmt.Sprintf("Invalid included/excluded namespace lists: %v", err))
}
@ -410,6 +417,8 @@ func (c *backupController) validateAndGetSnapshotLocations(backup *velerov1api.B
}
func (c *backupController) runBackup(backup *pkgbackup.Request) error {
fmt.Println("runbackup name is.....---- ", backup.Name)
log := c.logger.WithField("backup", kubeutil.NamespaceAndName(backup))
log.Info("Starting backup")
@ -451,6 +460,12 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error {
}
var errs []error
errs = append(errs, validateUniqueness(backupStore, backup.StorageLocation.Spec.StorageType.ObjectStorage.Bucket, backup.Name)...)
if len(errs) > 0 {
backup.Status.Phase = velerov1api.BackupPhaseFailed
backup.Status.CompletionTimestamp.Time = c.clock.Now()
return kerrors.NewAggregate(errs)
}
// Do the actual backup
if err := c.backupper.Backup(log, backup, backupFile, actions, pluginManager); err != nil {
@ -483,6 +498,18 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error {
return kerrors.NewAggregate(errs)
}
func validateUniqueness(backupStore persistence.BackupStore, bucket, name string) []error {
var errs []error
exists, err := backupStore.BackupExists(bucket, name)
if err != nil {
errs = append(errs, errors.Errorf("Error checking if backup already exists in object storage: %v", err))
}
if exists {
errs = append(errs, errors.Errorf("Backup already exists in object storage"))
}
return errs
}
func recordBackupMetrics(backup *velerov1api.Backup, backupFile *os.File, serverMetrics *metrics.ServerMetrics) error {
backupScheduleName := backup.GetLabels()[velerov1api.ScheduleNameLabel]

View File

@ -56,23 +56,24 @@ func (b *fakeBackupper) Backup(logger logrus.FieldLogger, backup *pkgbackup.Requ
return args.Error(0)
}
func TestProcessBackupNonProcessedItems(t *testing.T) {
func TestProcessBackupProcessing(t *testing.T) {
tests := []struct {
name string
key string
backup *v1.Backup
expectedErr string
}{
// processed successfully
{
name: "bad key returns error",
key: "bad/key/here",
expectedErr: "error splitting queue key: unexpected key format: \"bad/key/here\"",
name: "bad key does not return error",
key: "bad/key/here",
},
{
name: "backup not found in lister returns error",
key: "nonexistent/backup",
expectedErr: "error getting backup: backup.velero.io \"backup\" not found",
name: "backup not found in lister does not return error",
key: "nonexistent/backup",
},
// skipped
{
name: "FailedValidation backup is not processed",
key: "velero/backup-1",
@ -255,18 +256,21 @@ func TestDefaultBackupTTL(t *testing.T) {
}
func TestProcessBackupCompletions(t *testing.T) {
defaultBackupLocation := velerotest.NewTestBackupStorageLocation().WithName("loc-1").BackupStorageLocation
defaultBackupLocation := velerotest.NewTestBackupStorageLocation().WithName("loc-1").WithObjectStorage("store-1").BackupStorageLocation
now, err := time.Parse(time.RFC1123Z, time.RFC1123Z)
require.NoError(t, err)
now = now.Local()
tests := []struct {
name string
backup *v1.Backup
backupLocation *v1.BackupStorageLocation
expectedResult *v1.Backup
name string
backup *v1.Backup
backupLocation *v1.BackupStorageLocation
expectedResult *v1.Backup
backupExists bool
existenceCheckError error
}{
// Completed
{
name: "backup with no backup location gets the default",
backup: velerotest.NewTestBackup().WithName("backup-1").Backup,
@ -294,7 +298,7 @@ func TestProcessBackupCompletions(t *testing.T) {
{
name: "backup with a specific backup location keeps it",
backup: velerotest.NewTestBackup().WithName("backup-1").WithStorageLocation("alt-loc").Backup,
backupLocation: velerotest.NewTestBackupStorageLocation().WithName("alt-loc").BackupStorageLocation,
backupLocation: velerotest.NewTestBackupStorageLocation().WithName("alt-loc").WithObjectStorage("store-1").BackupStorageLocation,
expectedResult: &v1.Backup{
ObjectMeta: metav1.ObjectMeta{
Namespace: v1.DefaultNamespace,
@ -340,6 +344,83 @@ func TestProcessBackupCompletions(t *testing.T) {
},
},
},
{
name: "backup with existing backup will fail",
backupExists: false,
backup: velerotest.NewTestBackup().WithName("backup-1").Backup,
backupLocation: defaultBackupLocation,
expectedResult: &v1.Backup{
ObjectMeta: metav1.ObjectMeta{
Namespace: v1.DefaultNamespace,
Name: "backup-1",
Labels: map[string]string{
"velero.io/storage-location": "loc-1",
},
},
Spec: v1.BackupSpec{
StorageLocation: defaultBackupLocation.Name,
},
Status: v1.BackupStatus{
Phase: v1.BackupPhaseCompleted,
Version: 1,
StartTimestamp: metav1.NewTime(now),
CompletionTimestamp: metav1.NewTime(now),
Expiration: metav1.NewTime(now),
},
},
},
// Failed
{
name: "backup with existing backup will fail",
backupExists: true,
backup: velerotest.NewTestBackup().WithName("backup-1").Backup,
backupLocation: defaultBackupLocation,
expectedResult: &v1.Backup{
ObjectMeta: metav1.ObjectMeta{
Namespace: v1.DefaultNamespace,
Name: "backup-1",
Labels: map[string]string{
"velero.io/storage-location": "loc-1",
},
},
Spec: v1.BackupSpec{
StorageLocation: defaultBackupLocation.Name,
},
Status: v1.BackupStatus{
Phase: v1.BackupPhaseFailed,
Version: 1,
StartTimestamp: metav1.NewTime(now),
CompletionTimestamp: metav1.NewTime(now),
Expiration: metav1.NewTime(now),
},
},
},
{
name: "error when checking if backup exists will cause backup to fail",
backup: velerotest.NewTestBackup().WithName("backup-1").Backup,
existenceCheckError: errors.New("Backup already exists in object storage"),
backupLocation: defaultBackupLocation,
expectedResult: &v1.Backup{
ObjectMeta: metav1.ObjectMeta{
Namespace: v1.DefaultNamespace,
Name: "backup-1",
Labels: map[string]string{
"velero.io/storage-location": "loc-1",
},
},
Spec: v1.BackupSpec{
StorageLocation: defaultBackupLocation.Name,
},
Status: v1.BackupStatus{
Phase: v1.BackupPhaseFailed,
Version: 1,
StartTimestamp: metav1.NewTime(now),
CompletionTimestamp: metav1.NewTime(now),
Expiration: metav1.NewTime(now),
},
},
},
}
for _, test := range tests {
@ -380,6 +461,7 @@ func TestProcessBackupCompletions(t *testing.T) {
completionTimestampIsPresent := func(buf *bytes.Buffer) bool {
return strings.Contains(buf.String(), `"completionTimestamp": "2006-01-02T22:04:05Z"`)
}
backupStore.On("BackupExists", test.backupLocation.Spec.StorageType.ObjectStorage.Bucket, test.backup.Name).Return(test.backupExists, test.existenceCheckError)
backupStore.On("PutBackup", test.backup.Name, mock.MatchedBy(completionTimestampIsPresent), mock.Anything, mock.Anything, mock.Anything).Return(nil)
// add the test's backup to the informer/lister store
@ -406,6 +488,8 @@ func TestProcessBackupCompletions(t *testing.T) {
res, err := clientset.VeleroV1().Backups(test.backup.Namespace).Get(test.backup.Name, metav1.GetOptions{})
require.NoError(t, err)
// failed tests for failed backup should have a phase of failed
assert.Equal(t, test.expectedResult, res)
})
}

View File

@ -63,6 +63,27 @@ func (_m *BackupStore) GetBackupContents(name string) (io.ReadCloser, error) {
return r0, r1
}
// BackupExists provides a mock function with given fields: bucket, backupName
func (_m *BackupStore) BackupExists(bucket string, backupName string) (bool, error) {
ret := _m.Called(bucket, backupName)
var r0 bool
if rf, ok := ret.Get(0).(func(string, string) bool); ok {
r0 = rf(bucket, backupName)
} else {
r0 = ret.Get(0).(bool)
}
var r1 error
if rf, ok := ret.Get(1).(func(string, string) error); ok {
r1 = rf(bucket, backupName)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetBackupMetadata provides a mock function with given fields: name
func (_m *BackupStore) GetBackupMetadata(name string) (*v1.Backup, error) {
ret := _m.Called(name)

View File

@ -47,6 +47,10 @@ type BackupStore interface {
GetBackupMetadata(name string) (*velerov1api.Backup, error)
GetBackupVolumeSnapshots(name string) ([]*volume.Snapshot, error)
GetBackupContents(name string) (io.ReadCloser, error)
// BackupExists checks if the backup metadata file exists in object storage.
BackupExists(bucket, backupName string) (bool, error)
DeleteBackup(name string) error
PutRestoreLog(backup, restore string, log io.Reader) error
@ -288,6 +292,10 @@ func (s *objectBackupStore) GetBackupContents(name string) (io.ReadCloser, error
return s.objectStore.GetObject(s.bucket, s.layout.getBackupContentsKey(name))
}
func (s *objectBackupStore) BackupExists(bucket, backupName string) (bool, error) {
return s.objectStore.ObjectExists(bucket, s.layout.getBackupMetadataKey(backupName))
}
func (s *objectBackupStore) DeleteBackup(name string) error {
objects, err := s.objectStore.ListObjects(s.bucket, s.layout.getBackupDir(name))
if err != nil {