diff --git a/pkg/cloudprovider/aws/object_store.go b/pkg/cloudprovider/aws/object_store.go index 5df9714f6..0bb074cfa 100644 --- a/pkg/cloudprovider/aws/object_store.go +++ b/pkg/cloudprovider/aws/object_store.go @@ -116,7 +116,7 @@ func (o *objectStore) Init(config map[string]string) error { return nil } -func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error { +func (o *objectStore) PutObject(bucket, key string, body io.Reader) error { req := &s3manager.UploadInput{ Bucket: &bucket, Key: &key, @@ -134,7 +134,7 @@ func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error return errors.Wrapf(err, "error putting object %s", key) } -func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error) { +func (o *objectStore) GetObject(bucket, key string) (io.ReadCloser, error) { req := &s3.GetObjectInput{ Bucket: &bucket, Key: &key, @@ -148,9 +148,10 @@ func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error return res.Body, nil } -func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) { +func (o *objectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) { req := &s3.ListObjectsV2Input{ Bucket: &bucket, + Prefix: &prefix, Delimiter: &delimiter, } @@ -161,7 +162,6 @@ func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]str } return !lastPage }) - if err != nil { return nil, errors.WithStack(err) } @@ -190,7 +190,7 @@ func (o *objectStore) ListObjects(bucket, prefix string) ([]string, error) { return ret, nil } -func (o *objectStore) DeleteObject(bucket string, key string) error { +func (o *objectStore) DeleteObject(bucket, key string) error { req := &s3.DeleteObjectInput{ Bucket: &bucket, Key: &key, diff --git a/pkg/cloudprovider/azure/object_store.go b/pkg/cloudprovider/azure/object_store.go index 1b2a39288..2a3c89889 100644 --- a/pkg/cloudprovider/azure/object_store.go +++ b/pkg/cloudprovider/azure/object_store.go @@ -119,7 +119,7 @@ func (o *objectStore) Init(config map[string]string) error { return nil } -func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error { +func (o *objectStore) PutObject(bucket, key string, body io.Reader) error { container, err := getContainerReference(o.blobClient, bucket) if err != nil { return err @@ -133,7 +133,7 @@ func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error return errors.WithStack(blob.CreateBlockBlobFromReader(body, nil)) } -func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error) { +func (o *objectStore) GetObject(bucket, key string) (io.ReadCloser, error) { container, err := getContainerReference(o.blobClient, bucket) if err != nil { return nil, err @@ -152,13 +152,14 @@ func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error return res, nil } -func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) { +func (o *objectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) { container, err := getContainerReference(o.blobClient, bucket) if err != nil { return nil, err } params := storage.ListBlobsParameters{ + Prefix: prefix, Delimiter: delimiter, } @@ -167,14 +168,7 @@ func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]str return nil, errors.WithStack(err) } - // Azure returns prefixes inclusive of the last delimiter. We need to strip - // it. - ret := make([]string, 0, len(res.BlobPrefixes)) - for _, prefix := range res.BlobPrefixes { - ret = append(ret, prefix[0:strings.LastIndex(prefix, delimiter)]) - } - - return ret, nil + return res.BlobPrefixes, nil } func (o *objectStore) ListObjects(bucket, prefix string) ([]string, error) { diff --git a/pkg/cloudprovider/gcp/object_store.go b/pkg/cloudprovider/gcp/object_store.go index 777a7563c..6033e1914 100644 --- a/pkg/cloudprovider/gcp/object_store.go +++ b/pkg/cloudprovider/gcp/object_store.go @@ -21,7 +21,6 @@ import ( "io" "io/ioutil" "os" - "strings" "time" "cloud.google.com/go/storage" @@ -98,7 +97,7 @@ func (o *objectStore) Init(config map[string]string) error { return nil } -func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error { +func (o *objectStore) PutObject(bucket, key string, body io.Reader) error { w := o.bucketWriter.getWriteCloser(bucket, key) // The writer returned by NewWriter is asynchronous, so errors aren't guaranteed @@ -114,7 +113,7 @@ func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error return closeErr } -func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error) { +func (o *objectStore) GetObject(bucket, key string) (io.ReadCloser, error) { r, err := o.client.Bucket(bucket).Object(key).NewReader(context.Background()) if err != nil { return nil, errors.WithStack(err) @@ -123,28 +122,30 @@ func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error return r, nil } -func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) { +func (o *objectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) { q := &storage.Query{ + Prefix: prefix, Delimiter: delimiter, } - var res []string - iter := o.client.Bucket(bucket).Objects(context.Background(), q) + var res []string for { obj, err := iter.Next() - if err == iterator.Done { - return res, nil - } - if err != nil { + if err != nil && err != iterator.Done { return nil, errors.WithStack(err) } + if err == iterator.Done { + break + } if obj.Prefix != "" { - res = append(res, obj.Prefix[0:strings.LastIndex(obj.Prefix, delimiter)]) + res = append(res, obj.Prefix) } } + + return res, nil } func (o *objectStore) ListObjects(bucket, prefix string) ([]string, error) { @@ -169,7 +170,7 @@ func (o *objectStore) ListObjects(bucket, prefix string) ([]string, error) { } } -func (o *objectStore) DeleteObject(bucket string, key string) error { +func (o *objectStore) DeleteObject(bucket, key string) error { return errors.Wrapf(o.client.Bucket(bucket).Object(key).Delete(context.Background()), "error deleting object %s", key) } diff --git a/pkg/cloudprovider/in_memory_object_store.go b/pkg/cloudprovider/in_memory_object_store.go new file mode 100644 index 000000000..ebb867e1b --- /dev/null +++ b/pkg/cloudprovider/in_memory_object_store.go @@ -0,0 +1,168 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloudprovider + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "strings" + "time" +) + +type BucketData map[string][]byte + +// InMemoryObjectStore is a simple implementation of the ObjectStore interface +// that stores its data in-memory/in-proc. This is mainly intended to be used +// as a test fake. +type InMemoryObjectStore struct { + Data map[string]BucketData +} + +func NewInMemoryObjectStore(buckets ...string) *InMemoryObjectStore { + o := &InMemoryObjectStore{ + Data: make(map[string]BucketData), + } + + for _, bucket := range buckets { + o.Data[bucket] = make(map[string][]byte) + } + + return o +} + +// +// Interface Implementation +// + +func (o *InMemoryObjectStore) Init(config map[string]string) error { + return nil +} + +func (o *InMemoryObjectStore) PutObject(bucket, key string, body io.Reader) error { + bucketData, ok := o.Data[bucket] + if !ok { + return errors.New("bucket not found") + } + + obj, err := ioutil.ReadAll(body) + if err != nil { + return err + } + + bucketData[key] = obj + + return nil +} + +func (o *InMemoryObjectStore) GetObject(bucket, key string) (io.ReadCloser, error) { + bucketData, ok := o.Data[bucket] + if !ok { + return nil, errors.New("bucket not found") + } + + obj, ok := bucketData[key] + if !ok { + return nil, errors.New("key not found") + } + + return ioutil.NopCloser(bytes.NewReader(obj)), nil +} + +func (o *InMemoryObjectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) { + keys, err := o.ListObjects(bucket, prefix) + if err != nil { + return nil, err + } + + // For each key, check if it has an instance of the delimiter *after* the prefix. + // If not, skip it; if so, return the prefix of the key up to/including the delimiter. + + var prefixes []string + for _, key := range keys { + // everything after 'prefix' + afterPrefix := key[len(prefix):] + + // index of the *start* of 'delimiter' in 'afterPrefix' + delimiterStart := strings.Index(afterPrefix, delimiter) + if delimiterStart == -1 { + continue + } + + // return the prefix, plus everything after the prefix and before + // the delimiter, plus the delimiter + fullPrefix := prefix + afterPrefix[0:delimiterStart] + delimiter + + prefixes = append(prefixes, fullPrefix) + } + + return prefixes, nil +} + +func (o *InMemoryObjectStore) ListObjects(bucket, prefix string) ([]string, error) { + bucketData, ok := o.Data[bucket] + if !ok { + return nil, errors.New("bucket not found") + } + + var objs []string + for key := range bucketData { + if strings.HasPrefix(key, prefix) { + objs = append(objs, key) + } + } + + return objs, nil +} + +func (o *InMemoryObjectStore) DeleteObject(bucket, key string) error { + bucketData, ok := o.Data[bucket] + if !ok { + return errors.New("bucket not found") + } + + delete(bucketData, key) + + return nil +} + +func (o *InMemoryObjectStore) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) { + bucketData, ok := o.Data[bucket] + if !ok { + return "", errors.New("bucket not found") + } + + _, ok = bucketData[key] + if !ok { + return "", errors.New("key not found") + } + + return "a-url", nil +} + +// +// Test Helper Methods +// + +func (o *InMemoryObjectStore) ClearBucket(bucket string) { + if _, ok := o.Data[bucket]; !ok { + return + } + + o.Data[bucket] = make(map[string][]byte) +} diff --git a/pkg/cloudprovider/mocks/backup_lister.go b/pkg/cloudprovider/mocks/backup_lister.go deleted file mode 100644 index f7d901c68..000000000 --- a/pkg/cloudprovider/mocks/backup_lister.go +++ /dev/null @@ -1,48 +0,0 @@ -/* -Copyright 2018 the Heptio Ark contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -// Code generated by mockery v1.0.0. DO NOT EDIT. -package mocks - -import mock "github.com/stretchr/testify/mock" -import v1 "github.com/heptio/ark/pkg/apis/ark/v1" - -// BackupLister is an autogenerated mock type for the BackupLister type -type BackupLister struct { - mock.Mock -} - -// ListBackups provides a mock function with given fields: bucket -func (_m *BackupLister) ListBackups(bucket string) ([]*v1.Backup, error) { - ret := _m.Called(bucket) - - var r0 []*v1.Backup - if rf, ok := ret.Get(0).(func(string) []*v1.Backup); ok { - r0 = rf(bucket) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*v1.Backup) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(string) error); ok { - r1 = rf(bucket) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} diff --git a/pkg/cloudprovider/object_store.go b/pkg/cloudprovider/object_store.go index 4cfc8f3a4..f849ca7a0 100644 --- a/pkg/cloudprovider/object_store.go +++ b/pkg/cloudprovider/object_store.go @@ -31,17 +31,23 @@ type ObjectStore interface { // PutObject creates a new object using the data in body within the specified // object storage bucket with the given key. - PutObject(bucket string, key string, body io.Reader) error + PutObject(bucket, key string, body io.Reader) error // GetObject retrieves the object with the given key from the specified // bucket in object storage. - GetObject(bucket string, key string) (io.ReadCloser, error) + GetObject(bucket, key string) (io.ReadCloser, error) - // ListCommonPrefixes gets a list of all object key prefixes that come - // before the provided delimiter. For example, if the bucket contains - // the keys "foo-1/bar", "foo-1/baz", and "foo-2/baz", and the delimiter - // is "/", this will return the slice {"foo-1", "foo-2"}. - ListCommonPrefixes(bucket string, delimiter string) ([]string, error) + // ListCommonPrefixes gets a list of all object key prefixes that start with + // the specified prefix and stop at the next instance of the provided delimiter. + // + // For example, if the bucket contains the following keys: + // a-prefix/foo-1/bar + // a-prefix/foo-1/baz + // a-prefix/foo-2/baz + // some-other-prefix/foo-3/bar + // and the provided prefix arg is "a-prefix/", and the delimiter is "/", + // this will return the slice {"a-prefix/foo-1/", "a-prefix/foo-2/"}. + ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) // ListObjects gets a list of all keys in the specified bucket // that have the given prefix. @@ -49,7 +55,7 @@ type ObjectStore interface { // DeleteObject removes the object with the specified key from the given // bucket. - DeleteObject(bucket string, key string) error + DeleteObject(bucket, key string) error // CreateSignedURL creates a pre-signed URL for the given bucket and key that expires after ttl. CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index 43a731b84..50dbf1058 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -42,7 +42,6 @@ import ( api "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/backup" - "github.com/heptio/ark/pkg/cloudprovider" arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" @@ -74,6 +73,7 @@ type backupController struct { backupLocationListerSynced cache.InformerSynced defaultBackupLocation string metrics *metrics.ServerMetrics + newBackupStore func(*api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) } func NewBackupController( @@ -105,6 +105,8 @@ func NewBackupController( backupLocationListerSynced: backupLocationInformer.Informer().HasSynced, defaultBackupLocation: defaultBackupLocation, metrics: metrics, + + newBackupStore: persistence.NewObjectBackupStore, } c.syncHandler = c.processBackup @@ -382,21 +384,21 @@ func (controller *backupController) runBackup(backup *api.Backup, backupLocation log.Info("Starting backup") - pluginManager := controller.newPluginManager(log) - defer pluginManager.CleanupClients() - backupFile, err := ioutil.TempFile("", "") if err != nil { return errors.Wrap(err, "error creating temp file for backup") } defer closeAndRemoveFile(backupFile, log) + pluginManager := controller.newPluginManager(log) + defer pluginManager.CleanupClients() + actions, err := pluginManager.GetBackupItemActions() if err != nil { return err } - objectStore, err := getObjectStoreForLocation(backupLocation, pluginManager) + backupStore, err := controller.newBackupStore(backupLocation, pluginManager, log) if err != nil { return err } @@ -438,7 +440,7 @@ func (controller *backupController) runBackup(backup *api.Backup, backupLocation controller.logger.WithError(err).Error("error closing gzippedLogFile") } - if err := persistence.UploadBackup(log, objectStore, backupLocation.Spec.ObjectStorage.Bucket, backup.Name, backupJSONToUpload, backupFileToUpload, logFile); err != nil { + if err := backupStore.PutBackup(backup.Name, backupJSONToUpload, backupFileToUpload, logFile); err != nil { errs = append(errs, err) } @@ -454,34 +456,6 @@ func (controller *backupController) runBackup(backup *api.Backup, backupLocation return kerrors.NewAggregate(errs) } -// TODO(ncdc): move this to a better location that isn't backup specific -func getObjectStoreForLocation(location *api.BackupStorageLocation, manager plugin.Manager) (cloudprovider.ObjectStore, error) { - if location.Spec.Provider == "" { - return nil, errors.New("backup storage location provider name must not be empty") - } - - objectStore, err := manager.GetObjectStore(location.Spec.Provider) - if err != nil { - return nil, err - } - - // add the bucket name to the config map so that object stores can use - // it when initializing. The AWS object store uses this to determine the - // bucket's region when setting up its client. - if location.Spec.ObjectStorage != nil { - if location.Spec.Config == nil { - location.Spec.Config = make(map[string]string) - } - location.Spec.Config["bucket"] = location.Spec.ObjectStorage.Bucket - } - - if err := objectStore.Init(location.Spec.Config); err != nil { - return nil, err - } - - return objectStore, nil -} - func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) { if err := file.Close(); err != nil { log.WithError(err).WithField("file", file.Name()).Error("error closing file") diff --git a/pkg/controller/backup_controller_test.go b/pkg/controller/backup_controller_test.go index 97c6fd63a..8896cc63d 100644 --- a/pkg/controller/backup_controller_test.go +++ b/pkg/controller/backup_controller_test.go @@ -19,26 +19,27 @@ package controller import ( "bytes" "encoding/json" - "fmt" "io" "strings" "testing" "time" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" core "k8s.io/client-go/testing" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/backup" "github.com/heptio/ark/pkg/generated/clientset/versioned/fake" informers "github.com/heptio/ark/pkg/generated/informers/externalversions" "github.com/heptio/ark/pkg/metrics" + "github.com/heptio/ark/pkg/persistence" + persistencemocks "github.com/heptio/ark/pkg/persistence/mocks" "github.com/heptio/ark/pkg/plugin" pluginmocks "github.com/heptio/ark/pkg/plugin/mocks" "github.com/heptio/ark/pkg/util/collections" @@ -179,12 +180,12 @@ func TestProcessBackup(t *testing.T) { sharedInformers = informers.NewSharedInformerFactory(client, 0) logger = logging.DefaultLogger(logrus.DebugLevel) clockTime, _ = time.Parse("Mon Jan 2 15:04:05 2006", "Mon Jan 2 15:04:05 2006") - objectStore = &arktest.ObjectStore{} pluginManager = &pluginmocks.Manager{} + backupStore = &persistencemocks.BackupStore{} ) defer backupper.AssertExpectations(t) - defer objectStore.AssertExpectations(t) defer pluginManager.AssertExpectations(t) + defer backupStore.AssertExpectations(t) c := NewBackupController( sharedInformers.Ark().V1().Backups(), @@ -202,6 +203,10 @@ func TestProcessBackup(t *testing.T) { c.clock = clock.NewFakeClock(clockTime) + c.newBackupStore = func(*v1.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) { + return backupStore, nil + } + var expiration, startTime time.Time if test.backup != nil { @@ -217,9 +222,6 @@ func TestProcessBackup(t *testing.T) { } if test.expectBackup { - pluginManager.On("GetObjectStore", "myCloud").Return(objectStore, nil) - objectStore.On("Init", mock.Anything).Return(nil) - // set up a Backup object to represent what we expect to be passed to backupper.Backup() backup := test.backup.DeepCopy() backup.Spec.IncludedResources = test.expectedIncludes @@ -278,11 +280,8 @@ func TestProcessBackup(t *testing.T) { return strings.Contains(json, timeString) } - objectStore.On("PutObject", "bucket", fmt.Sprintf("%s/%s-logs.gz", test.backup.Name, test.backup.Name), mock.Anything).Return(nil) - objectStore.On("PutObject", "bucket", fmt.Sprintf("%s/ark-backup.json", test.backup.Name), mock.MatchedBy(completionTimestampIsPresent)).Return(nil) - objectStore.On("PutObject", "bucket", fmt.Sprintf("%s/%s.tar.gz", test.backup.Name, test.backup.Name), mock.Anything).Return(nil) - - pluginManager.On("CleanupClients") + backupStore.On("PutBackup", test.backup.Name, mock.MatchedBy(completionTimestampIsPresent), mock.Anything, mock.Anything).Return(nil) + pluginManager.On("CleanupClients").Return() } // this is necessary so the Patch() call returns the appropriate object diff --git a/pkg/controller/backup_deletion_controller.go b/pkg/controller/backup_deletion_controller.go index 9c63e0c3e..427ea142e 100644 --- a/pkg/controller/backup_deletion_controller.go +++ b/pkg/controller/backup_deletion_controller.go @@ -58,10 +58,10 @@ type backupDeletionController struct { resticMgr restic.RepositoryManager podvolumeBackupLister listers.PodVolumeBackupLister backupLocationLister listers.BackupStorageLocationLister - deleteBackupDir persistence.DeleteBackupDirFunc processRequestFunc func(*v1.DeleteBackupRequest) error clock clock.Clock newPluginManager func(logrus.FieldLogger) plugin.Manager + newBackupStore func(*v1.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) } // NewBackupDeletionController creates a new backup deletion controller. @@ -95,7 +95,7 @@ func NewBackupDeletionController( // use variables to refer to these functions so they can be // replaced with fakes for testing. newPluginManager: newPluginManager, - deleteBackupDir: persistence.DeleteBackupDir, + newBackupStore: persistence.NewObjectBackupStore, clock: &clock.RealClock{}, } @@ -322,12 +322,12 @@ func (c *backupDeletionController) deleteBackupFromStorage(backup *v1.Backup, lo return errors.WithStack(err) } - objectStore, err := getObjectStoreForLocation(backupLocation, pluginManager) + backupStore, err := c.newBackupStore(backupLocation, pluginManager, log) if err != nil { return err } - if err := c.deleteBackupDir(log, objectStore, backupLocation.Spec.ObjectStorage.Bucket, backup.Name); err != nil { + if err := backupStore.DeleteBackup(backup.Name); err != nil { return errors.Wrap(err, "error deleting backup from backup storage") } diff --git a/pkg/controller/backup_deletion_controller_test.go b/pkg/controller/backup_deletion_controller_test.go index 1a1b6df77..a2c18b6e9 100644 --- a/pkg/controller/backup_deletion_controller_test.go +++ b/pkg/controller/backup_deletion_controller_test.go @@ -21,25 +21,27 @@ import ( "testing" "time" - "github.com/heptio/ark/pkg/apis/ark/v1" - pkgbackup "github.com/heptio/ark/pkg/backup" - "github.com/heptio/ark/pkg/cloudprovider" - "github.com/heptio/ark/pkg/generated/clientset/versioned/fake" - informers "github.com/heptio/ark/pkg/generated/informers/externalversions" - "github.com/heptio/ark/pkg/plugin" - pluginmocks "github.com/heptio/ark/pkg/plugin/mocks" - arktest "github.com/heptio/ark/pkg/util/test" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" core "k8s.io/client-go/testing" + + "github.com/heptio/ark/pkg/apis/ark/v1" + pkgbackup "github.com/heptio/ark/pkg/backup" + "github.com/heptio/ark/pkg/generated/clientset/versioned/fake" + informers "github.com/heptio/ark/pkg/generated/informers/externalversions" + "github.com/heptio/ark/pkg/persistence" + persistencemocks "github.com/heptio/ark/pkg/persistence/mocks" + "github.com/heptio/ark/pkg/plugin" + pluginmocks "github.com/heptio/ark/pkg/plugin/mocks" + arktest "github.com/heptio/ark/pkg/util/test" ) func TestBackupDeletionControllerProcessQueueItem(t *testing.T) { @@ -112,7 +114,7 @@ type backupDeletionControllerTestData struct { client *fake.Clientset sharedInformers informers.SharedInformerFactory blockStore *arktest.FakeBlockStore - objectStore *arktest.ObjectStore + backupStore *persistencemocks.BackupStore controller *backupDeletionController req *v1.DeleteBackupRequest } @@ -123,7 +125,7 @@ func setupBackupDeletionControllerTest(objects ...runtime.Object) *backupDeletio sharedInformers = informers.NewSharedInformerFactory(client, 0) blockStore = &arktest.FakeBlockStore{SnapshotsTaken: sets.NewString()} pluginManager = &pluginmocks.Manager{} - objectStore = &arktest.ObjectStore{} + backupStore = &persistencemocks.BackupStore{} req = pkgbackup.NewDeleteBackupRequest("foo", "uid") ) @@ -131,7 +133,7 @@ func setupBackupDeletionControllerTest(objects ...runtime.Object) *backupDeletio client: client, sharedInformers: sharedInformers, blockStore: blockStore, - objectStore: objectStore, + backupStore: backupStore, controller: NewBackupDeletionController( arktest.NewLogger(), sharedInformers.Ark().V1().DeleteBackupRequests(), @@ -150,7 +152,10 @@ func setupBackupDeletionControllerTest(objects ...runtime.Object) *backupDeletio req: req, } - pluginManager.On("GetObjectStore", "objStoreProvider").Return(objectStore, nil) + data.controller.newBackupStore = func(*v1.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) { + return backupStore, nil + } + pluginManager.On("CleanupClients").Return(nil) req.Namespace = "heptio-ark" @@ -388,8 +393,6 @@ func TestBackupDeletionControllerProcessRequest(t *testing.T) { } require.NoError(t, td.sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(location)) - td.objectStore.On("Init", mock.Anything).Return(nil) - // Clear out req labels to make sure the controller adds them td.req.Labels = make(map[string]string) @@ -406,12 +409,7 @@ func TestBackupDeletionControllerProcessRequest(t *testing.T) { return true, backup, nil }) - td.controller.deleteBackupDir = func(_ logrus.FieldLogger, objectStore cloudprovider.ObjectStore, bucket, backupName string) error { - require.NotNil(t, objectStore) - require.Equal(t, location.Spec.ObjectStorage.Bucket, bucket) - require.Equal(t, td.req.Spec.BackupName, backupName) - return nil - } + td.backupStore.On("DeleteBackup", td.req.Spec.BackupName).Return(nil) err := td.controller.processRequest(td.req) require.NoError(t, err) diff --git a/pkg/controller/backup_sync_controller.go b/pkg/controller/backup_sync_controller.go index f1f1700e6..2c6a25326 100644 --- a/pkg/controller/backup_sync_controller.go +++ b/pkg/controller/backup_sync_controller.go @@ -29,7 +29,6 @@ import ( "k8s.io/client-go/tools/cache" arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" - "github.com/heptio/ark/pkg/cloudprovider" arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" @@ -48,7 +47,7 @@ type backupSyncController struct { namespace string defaultBackupLocation string newPluginManager func(logrus.FieldLogger) plugin.Manager - listCloudBackups func(logrus.FieldLogger, cloudprovider.ObjectStore, string) ([]*arkv1api.Backup, error) + newBackupStore func(*arkv1api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) } func NewBackupSyncController( @@ -77,7 +76,7 @@ func NewBackupSyncController( // use variables to refer to these functions so they can be // replaced with fakes for testing. newPluginManager: newPluginManager, - listCloudBackups: persistence.ListBackups, + newBackupStore: persistence.NewObjectBackupStore, } c.resyncFunc = c.run @@ -109,19 +108,19 @@ func (c *backupSyncController) run() { log := c.logger.WithField("backupLocation", location.Name) log.Info("Syncing backups from backup location") - objectStore, err := getObjectStoreForLocation(location, pluginManager) + backupStore, err := c.newBackupStore(location, pluginManager, log) if err != nil { - log.WithError(err).Error("Error getting object store for location") + log.WithError(err).Error("Error getting backup store for location") continue } - backupsInBackupStore, err := c.listCloudBackups(log, objectStore, location.Spec.ObjectStorage.Bucket) + backupsInBackupStore, err := backupStore.ListBackups() if err != nil { - log.WithError(err).Error("Error listing backups in object store") + log.WithError(err).Error("Error listing backups in backup store") continue } - log.WithField("backupCount", len(backupsInBackupStore)).Info("Got backups from object store") + log.WithField("backupCount", len(backupsInBackupStore)).Info("Got backups from backup store") cloudBackupNames := sets.NewString() for _, cloudBackup := range backupsInBackupStore { diff --git a/pkg/controller/backup_sync_controller_test.go b/pkg/controller/backup_sync_controller_test.go index a69d31c4b..72de8802e 100644 --- a/pkg/controller/backup_sync_controller_test.go +++ b/pkg/controller/backup_sync_controller_test.go @@ -20,25 +20,23 @@ import ( "testing" "time" - "github.com/pkg/errors" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" core "k8s.io/client-go/testing" arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" - "github.com/heptio/ark/pkg/cloudprovider" "github.com/heptio/ark/pkg/generated/clientset/versioned/fake" informers "github.com/heptio/ark/pkg/generated/informers/externalversions" + "github.com/heptio/ark/pkg/persistence" + persistencemocks "github.com/heptio/ark/pkg/persistence/mocks" "github.com/heptio/ark/pkg/plugin" pluginmocks "github.com/heptio/ark/pkg/plugin/mocks" "github.com/heptio/ark/pkg/util/stringslice" arktest "github.com/heptio/ark/pkg/util/test" - "github.com/stretchr/testify/assert" ) func defaultLocationsList(namespace string) []*arkv1api.BackupStorageLocation { @@ -167,7 +165,7 @@ func TestBackupSyncControllerRun(t *testing.T) { client = fake.NewSimpleClientset() sharedInformers = informers.NewSharedInformerFactory(client, 0) pluginManager = &pluginmocks.Manager{} - objectStore = &arktest.ObjectStore{} + backupStores = make(map[string]*persistencemocks.BackupStore) ) c := NewBackupSyncController( @@ -181,22 +179,23 @@ func TestBackupSyncControllerRun(t *testing.T) { arktest.NewLogger(), ).(*backupSyncController) - pluginManager.On("GetObjectStore", "objStoreProvider").Return(objectStore, nil) - pluginManager.On("CleanupClients").Return(nil) + c.newBackupStore = func(loc *arkv1api.BackupStorageLocation, _ persistence.ObjectStoreGetter, _ logrus.FieldLogger) (persistence.BackupStore, error) { + // this gets populated just below, prior to exercising the method under test + return backupStores[loc.Name], nil + } - objectStore.On("Init", mock.Anything).Return(nil) + pluginManager.On("CleanupClients").Return(nil) for _, location := range test.locations { require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(location)) + backupStores[location.Name] = &persistencemocks.BackupStore{} } - c.listCloudBackups = func(_ logrus.FieldLogger, _ cloudprovider.ObjectStore, bucket string) ([]*arkv1api.Backup, error) { - backups, ok := test.cloudBackups[bucket] - if !ok { - return nil, errors.New("bucket not found") - } + for _, location := range test.locations { + backupStore, ok := backupStores[location.Name] + require.True(t, ok, "no mock backup store for location %s", location.Name) - return backups, nil + backupStore.On("ListBackups").Return(test.cloudBackups[location.Spec.ObjectStorage.Bucket], nil) } for _, existingBackup := range test.existingBackups { diff --git a/pkg/controller/download_request_controller.go b/pkg/controller/download_request_controller.go index 747e6ce40..0a224f1d9 100644 --- a/pkg/controller/download_request_controller.go +++ b/pkg/controller/download_request_controller.go @@ -47,10 +47,10 @@ type downloadRequestController struct { downloadRequestLister listers.DownloadRequestLister restoreLister listers.RestoreLister clock clock.Clock - createSignedURL persistence.CreateSignedURLFunc backupLocationLister listers.BackupStorageLocationLister backupLister listers.BackupLister newPluginManager func(logrus.FieldLogger) plugin.Manager + newBackupStore func(*v1.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) } // NewDownloadRequestController creates a new DownloadRequestController. @@ -73,8 +73,8 @@ func NewDownloadRequestController( // use variables to refer to these functions so they can be // replaced with fakes for testing. - createSignedURL: persistence.CreateSignedURL, newPluginManager: newPluginManager, + newBackupStore: persistence.NewObjectBackupStore, clock: &clock.RealClock{}, } @@ -146,8 +146,8 @@ func (c *downloadRequestController) generatePreSignedURL(downloadRequest *v1.Dow update := downloadRequest.DeepCopy() var ( - directory string - err error + backupName string + err error ) switch downloadRequest.Spec.Target.Kind { @@ -157,12 +157,12 @@ func (c *downloadRequestController) generatePreSignedURL(downloadRequest *v1.Dow return errors.Wrap(err, "error getting Restore") } - directory = restore.Spec.BackupName + backupName = restore.Spec.BackupName default: - directory = downloadRequest.Spec.Target.Name + backupName = downloadRequest.Spec.Target.Name } - backup, err := c.backupLister.Backups(downloadRequest.Namespace).Get(directory) + backup, err := c.backupLister.Backups(downloadRequest.Namespace).Get(backupName) if err != nil { return errors.WithStack(err) } @@ -175,18 +175,17 @@ func (c *downloadRequestController) generatePreSignedURL(downloadRequest *v1.Dow pluginManager := c.newPluginManager(log) defer pluginManager.CleanupClients() - objectStore, err := getObjectStoreForLocation(backupLocation, pluginManager) + backupStore, err := c.newBackupStore(backupLocation, pluginManager, log) if err != nil { return errors.WithStack(err) } - update.Status.DownloadURL, err = c.createSignedURL(objectStore, downloadRequest.Spec.Target, backupLocation.Spec.ObjectStorage.Bucket, directory, signedURLTTL) - if err != nil { + if update.Status.DownloadURL, err = backupStore.GetDownloadURL(backupName, downloadRequest.Spec.Target); err != nil { return err } update.Status.Phase = v1.DownloadRequestPhaseProcessed - update.Status.Expiration = metav1.NewTime(c.clock.Now().Add(signedURLTTL)) + update.Status.Expiration = metav1.NewTime(c.clock.Now().Add(persistence.DownloadURLTTL)) _, err = patchDownloadRequest(downloadRequest, update, c.downloadRequestClient) return errors.WithStack(err) diff --git a/pkg/controller/download_request_controller_test.go b/pkg/controller/download_request_controller_test.go index 7098d9f11..8411d31d5 100644 --- a/pkg/controller/download_request_controller_test.go +++ b/pkg/controller/download_request_controller_test.go @@ -22,7 +22,6 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -32,6 +31,8 @@ import ( "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/generated/clientset/versioned/fake" informers "github.com/heptio/ark/pkg/generated/informers/externalversions" + "github.com/heptio/ark/pkg/persistence" + persistencemocks "github.com/heptio/ark/pkg/persistence/mocks" "github.com/heptio/ark/pkg/plugin" pluginmocks "github.com/heptio/ark/pkg/plugin/mocks" kubeutil "github.com/heptio/ark/pkg/util/kube" @@ -42,7 +43,7 @@ type downloadRequestTestHarness struct { client *fake.Clientset informerFactory informers.SharedInformerFactory pluginManager *pluginmocks.Manager - objectStore *arktest.ObjectStore + backupStore *persistencemocks.BackupStore controller *downloadRequestController } @@ -52,7 +53,7 @@ func newDownloadRequestTestHarness(t *testing.T) *downloadRequestTestHarness { client = fake.NewSimpleClientset() informerFactory = informers.NewSharedInformerFactory(client, 0) pluginManager = new(pluginmocks.Manager) - objectStore = new(arktest.ObjectStore) + backupStore = new(persistencemocks.BackupStore) controller = NewDownloadRequestController( client.ArkV1(), informerFactory.Ark().V1().DownloadRequests(), @@ -66,17 +67,19 @@ func newDownloadRequestTestHarness(t *testing.T) *downloadRequestTestHarness { clockTime, err := time.Parse(time.RFC1123, time.RFC1123) require.NoError(t, err) - controller.clock = clock.NewFakeClock(clockTime) + controller.newBackupStore = func(*v1.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) { + return backupStore, nil + } + pluginManager.On("CleanupClients").Return() - objectStore.On("Init", mock.Anything).Return(nil) return &downloadRequestTestHarness{ client: client, informerFactory: informerFactory, pluginManager: pluginManager, - objectStore: objectStore, + backupStore: backupStore, controller: controller, } } @@ -118,15 +121,15 @@ func newBackupLocation(name, provider, bucket string) *v1.BackupStorageLocation func TestProcessDownloadRequest(t *testing.T) { tests := []struct { - name string - key string - downloadRequest *v1.DownloadRequest - backup *v1.Backup - restore *v1.Restore - backupLocation *v1.BackupStorageLocation - expired bool - expectedErr string - expectedRequestedObject string + name string + key string + downloadRequest *v1.DownloadRequest + backup *v1.Backup + restore *v1.Restore + backupLocation *v1.BackupStorageLocation + expired bool + expectedErr string + expectGetsURL bool }{ { name: "empty key returns without error", @@ -163,64 +166,64 @@ func TestProcessDownloadRequest(t *testing.T) { expectedErr: "backupstoragelocation.ark.heptio.com \"a-location\" not found", }, { - name: "backup contents request with phase '' gets a url", - downloadRequest: newDownloadRequest("", v1.DownloadTargetKindBackupContents, "a-backup"), - backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, - backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), - expectedRequestedObject: "a-backup/a-backup.tar.gz", + name: "backup contents request with phase '' gets a url", + downloadRequest: newDownloadRequest("", v1.DownloadTargetKindBackupContents, "a-backup"), + backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, + backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), + expectGetsURL: true, }, { - name: "backup contents request with phase 'New' gets a url", - downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindBackupContents, "a-backup"), - backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, - backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), - expectedRequestedObject: "a-backup/a-backup.tar.gz", + name: "backup contents request with phase 'New' gets a url", + downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindBackupContents, "a-backup"), + backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, + backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), + expectGetsURL: true, }, { - name: "backup log request with phase '' gets a url", - downloadRequest: newDownloadRequest("", v1.DownloadTargetKindBackupLog, "a-backup"), - backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, - backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), - expectedRequestedObject: "a-backup/a-backup-logs.gz", + name: "backup log request with phase '' gets a url", + downloadRequest: newDownloadRequest("", v1.DownloadTargetKindBackupLog, "a-backup"), + backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, + backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), + expectGetsURL: true, }, { - name: "backup log request with phase 'New' gets a url", - downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindBackupLog, "a-backup"), - backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, - backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), - expectedRequestedObject: "a-backup/a-backup-logs.gz", + name: "backup log request with phase 'New' gets a url", + downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindBackupLog, "a-backup"), + backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, + backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), + expectGetsURL: true, }, { - name: "restore log request with phase '' gets a url", - downloadRequest: newDownloadRequest("", v1.DownloadTargetKindRestoreLog, "a-backup-20170912150214"), - restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore, - backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, - backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), - expectedRequestedObject: "a-backup/restore-a-backup-20170912150214-logs.gz", + name: "restore log request with phase '' gets a url", + downloadRequest: newDownloadRequest("", v1.DownloadTargetKindRestoreLog, "a-backup-20170912150214"), + restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore, + backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, + backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), + expectGetsURL: true, }, { - name: "restore log request with phase 'New' gets a url", - downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindRestoreLog, "a-backup-20170912150214"), - restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore, - backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, - backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), - expectedRequestedObject: "a-backup/restore-a-backup-20170912150214-logs.gz", + name: "restore log request with phase 'New' gets a url", + downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindRestoreLog, "a-backup-20170912150214"), + restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore, + backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, + backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), + expectGetsURL: true, }, { - name: "restore results request with phase '' gets a url", - downloadRequest: newDownloadRequest("", v1.DownloadTargetKindRestoreResults, "a-backup-20170912150214"), - restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore, - backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, - backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), - expectedRequestedObject: "a-backup/restore-a-backup-20170912150214-results.gz", + name: "restore results request with phase '' gets a url", + downloadRequest: newDownloadRequest("", v1.DownloadTargetKindRestoreResults, "a-backup-20170912150214"), + restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore, + backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, + backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), + expectGetsURL: true, }, { - name: "restore results request with phase 'New' gets a url", - downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindRestoreResults, "a-backup-20170912150214"), - restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore, - backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, - backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), - expectedRequestedObject: "a-backup/restore-a-backup-20170912150214-results.gz", + name: "restore results request with phase 'New' gets a url", + downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindRestoreResults, "a-backup-20170912150214"), + restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore, + backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, + backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), + expectGetsURL: true, }, { name: "request with phase 'Processed' is not deleted if not expired", @@ -268,12 +271,10 @@ func TestProcessDownloadRequest(t *testing.T) { if tc.backupLocation != nil { require.NoError(t, harness.informerFactory.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(tc.backupLocation)) - - harness.pluginManager.On("GetObjectStore", tc.backupLocation.Spec.Provider).Return(harness.objectStore, nil) } - if tc.expectedRequestedObject != "" { - harness.objectStore.On("CreateSignedURL", tc.backupLocation.Spec.ObjectStorage.Bucket, tc.expectedRequestedObject, mock.Anything).Return("a-url", nil) + if tc.expectGetsURL { + harness.backupStore.On("GetDownloadURL", tc.backup.Name, tc.downloadRequest.Spec.Target).Return("a-url", nil) } // exercise method under test @@ -291,7 +292,7 @@ func TestProcessDownloadRequest(t *testing.T) { assert.Nil(t, err) } - if tc.expectedRequestedObject != "" { + if tc.expectGetsURL { output, err := harness.client.ArkV1().DownloadRequests(tc.downloadRequest.Namespace).Get(tc.downloadRequest.Name, metav1.GetOptions{}) require.NoError(t, err) diff --git a/pkg/controller/restore_controller.go b/pkg/controller/restore_controller.go index f4b01d2ff..aaf01c65f 100644 --- a/pkg/controller/restore_controller.go +++ b/pkg/controller/restore_controller.go @@ -41,7 +41,6 @@ import ( "k8s.io/client-go/util/workqueue" api "github.com/heptio/ark/pkg/apis/ark/v1" - "github.com/heptio/ark/pkg/cloudprovider" arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" @@ -90,11 +89,8 @@ type restoreController struct { defaultBackupLocation string metrics *metrics.ServerMetrics - getBackup persistence.GetBackupFunc - downloadBackup persistence.DownloadBackupFunc - uploadRestoreLog persistence.UploadRestoreLogFunc - uploadRestoreResults persistence.UploadRestoreResultsFunc - newPluginManager func(logger logrus.FieldLogger) plugin.Manager + newPluginManager func(logger logrus.FieldLogger) plugin.Manager + newBackupStore func(*api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) } func NewRestoreController( @@ -132,11 +128,8 @@ func NewRestoreController( // use variables to refer to these functions so they can be // replaced with fakes for testing. - newPluginManager: newPluginManager, - getBackup: persistence.GetBackup, - downloadBackup: persistence.DownloadBackup, - uploadRestoreLog: persistence.UploadRestoreLog, - uploadRestoreResults: persistence.UploadRestoreResults, + newPluginManager: newPluginManager, + newBackupStore: persistence.NewObjectBackupStore, } c.syncHandler = c.processRestore @@ -354,9 +347,8 @@ func (c *restoreController) processRestore(key string) error { } type backupInfo struct { - bucketName string backup *api.Backup - objectStore cloudprovider.ObjectStore + backupStore persistence.BackupStore } func (c *restoreController) validateAndComplete(restore *api.Restore, pluginManager plugin.Manager) backupInfo { @@ -469,9 +461,7 @@ func mostRecentCompletedBackup(backups []*api.Backup) *api.Backup { // fetchBackupInfo checks the backup lister for a backup that matches the given name. If it doesn't // find it, it tries to retrieve it from one of the backup storage locations. func (c *restoreController) fetchBackupInfo(backupName string, pluginManager plugin.Manager) (backupInfo, error) { - var info backupInfo - var err error - info.backup, err = c.backupLister.Backups(c.namespace).Get(backupName) + backup, err := c.backupLister.Backups(c.namespace).Get(backupName) if err != nil { if !apierrors.IsNotFound(err) { return backupInfo{}, errors.WithStack(err) @@ -482,18 +472,20 @@ func (c *restoreController) fetchBackupInfo(backupName string, pluginManager plu return c.fetchFromBackupStorage(backupName, pluginManager) } - location, err := c.backupLocationLister.BackupStorageLocations(c.namespace).Get(info.backup.Spec.StorageLocation) + location, err := c.backupLocationLister.BackupStorageLocations(c.namespace).Get(backup.Spec.StorageLocation) if err != nil { return backupInfo{}, errors.WithStack(err) } - info.objectStore, err = getObjectStoreForLocation(location, pluginManager) + backupStore, err := c.newBackupStore(location, pluginManager, c.logger) if err != nil { - return backupInfo{}, errors.Wrap(err, "error initializing object store") + return backupInfo{}, err } - info.bucketName = location.Spec.ObjectStorage.Bucket - return info, nil + return backupInfo{ + backup: backup, + backupStore: backupStore, + }, nil } // fetchFromBackupStorage checks each backup storage location, starting with the default, @@ -541,12 +533,12 @@ func orderedBackupLocations(locations []*api.BackupStorageLocation, defaultLocat } func (c *restoreController) backupInfoForLocation(location *api.BackupStorageLocation, backupName string, pluginManager plugin.Manager) (backupInfo, error) { - objectStore, err := getObjectStoreForLocation(location, pluginManager) + backupStore, err := persistence.NewObjectBackupStore(location, pluginManager, c.logger) if err != nil { return backupInfo{}, err } - backup, err := c.getBackup(objectStore, location.Spec.ObjectStorage.Bucket, backupName) + backup, err := backupStore.GetBackupMetadata(backupName) if err != nil { return backupInfo{}, err } @@ -562,9 +554,8 @@ func (c *restoreController) backupInfoForLocation(location *api.BackupStorageLoc } return backupInfo{ - bucketName: location.Spec.ObjectStorage.Bucket, backup: backupCreated, - objectStore: objectStore, + backupStore: backupStore, }, nil } @@ -603,7 +594,7 @@ func (c *restoreController) runRestore( "backup": restore.Spec.BackupName, }) - backupFile, err := downloadToTempFile(info.objectStore, info.bucketName, restore.Spec.BackupName, c.downloadBackup, c.logger) + backupFile, err := downloadToTempFile(restore.Spec.BackupName, info.backupStore, c.logger) if err != nil { logContext.WithError(err).Error("Error downloading backup") restoreErrors.Ark = append(restoreErrors.Ark, err.Error()) @@ -637,8 +628,8 @@ func (c *restoreController) runRestore( return } - if err := c.uploadRestoreLog(info.objectStore, info.bucketName, restore.Spec.BackupName, restore.Name, logFile); err != nil { - restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error uploading log file to object storage: %v", err)) + if err := info.backupStore.PutRestoreLog(restore.Spec.BackupName, restore.Name, logFile); err != nil { + restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error uploading log file to backup storage: %v", err)) } m := map[string]api.RestoreResult{ @@ -658,20 +649,19 @@ func (c *restoreController) runRestore( logContext.WithError(errors.WithStack(err)).Error("Error resetting results file offset to 0") return } - if err := c.uploadRestoreResults(info.objectStore, info.bucketName, restore.Spec.BackupName, restore.Name, resultsFile); err != nil { - logContext.WithError(errors.WithStack(err)).Error("Error uploading results files to object storage") + if err := info.backupStore.PutRestoreResults(restore.Spec.BackupName, restore.Name, resultsFile); err != nil { + logContext.WithError(errors.WithStack(err)).Error("Error uploading results file to backup storage") } return } func downloadToTempFile( - objectStore cloudprovider.ObjectStore, - bucket, backupName string, - downloadBackup persistence.DownloadBackupFunc, + backupName string, + backupStore persistence.BackupStore, logger logrus.FieldLogger, ) (*os.File, error) { - readCloser, err := downloadBackup(objectStore, bucket, backupName) + readCloser, err := backupStore.GetBackupContents(backupName) if err != nil { return nil, err } diff --git a/pkg/controller/restore_controller_test.go b/pkg/controller/restore_controller_test.go index 407a30739..257645cfd 100644 --- a/pkg/controller/restore_controller_test.go +++ b/pkg/controller/restore_controller_test.go @@ -29,16 +29,18 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" api "github.com/heptio/ark/pkg/apis/ark/v1" - "github.com/heptio/ark/pkg/cloudprovider" "github.com/heptio/ark/pkg/generated/clientset/versioned/fake" informers "github.com/heptio/ark/pkg/generated/informers/externalversions" "github.com/heptio/ark/pkg/metrics" + "github.com/heptio/ark/pkg/persistence" + persistencemocks "github.com/heptio/ark/pkg/persistence/mocks" "github.com/heptio/ark/pkg/plugin" pluginmocks "github.com/heptio/ark/pkg/plugin/mocks" "github.com/heptio/ark/pkg/restore" @@ -48,14 +50,14 @@ import ( func TestFetchBackupInfo(t *testing.T) { tests := []struct { - name string - backupName string - informerLocations []*api.BackupStorageLocation - informerBackups []*api.Backup - backupServiceBackup *api.Backup - backupServiceError error - expectedRes *api.Backup - expectedErr bool + name string + backupName string + informerLocations []*api.BackupStorageLocation + informerBackups []*api.Backup + backupStoreBackup *api.Backup + backupStoreError error + expectedRes *api.Backup + expectedErr bool }{ { name: "lister has backup", @@ -65,18 +67,18 @@ func TestFetchBackupInfo(t *testing.T) { expectedRes: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup, }, { - name: "lister does not have a backup, but backupSvc does", - backupName: "backup-1", - backupServiceBackup: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup, - informerLocations: []*api.BackupStorageLocation{arktest.NewTestBackupStorageLocation().WithName("default").WithProvider("myCloud").WithObjectStorage("bucket").BackupStorageLocation}, - informerBackups: []*api.Backup{arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup}, - expectedRes: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup, + name: "lister does not have a backup, but backupSvc does", + backupName: "backup-1", + backupStoreBackup: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup, + informerLocations: []*api.BackupStorageLocation{arktest.NewTestBackupStorageLocation().WithName("default").WithProvider("myCloud").WithObjectStorage("bucket").BackupStorageLocation}, + informerBackups: []*api.Backup{arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup}, + expectedRes: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup, }, { - name: "no backup", - backupName: "backup-1", - backupServiceError: errors.New("no backup here"), - expectedErr: true, + name: "no backup", + backupName: "backup-1", + backupStoreError: errors.New("no backup here"), + expectedErr: true, }, } @@ -88,11 +90,11 @@ func TestFetchBackupInfo(t *testing.T) { sharedInformers = informers.NewSharedInformerFactory(client, 0) logger = arktest.NewLogger() pluginManager = &pluginmocks.Manager{} - objectStore = &arktest.ObjectStore{} + backupStore = &persistencemocks.BackupStore{} ) defer restorer.AssertExpectations(t) - defer objectStore.AssertExpectations(t) + defer backupStore.AssertExpectations(t) c := NewRestoreController( api.DefaultNamespace, @@ -110,10 +112,11 @@ func TestFetchBackupInfo(t *testing.T) { metrics.NewServerMetrics(), ).(*restoreController) - if test.backupServiceError == nil { - pluginManager.On("GetObjectStore", "myCloud").Return(objectStore, nil) - objectStore.On("Init", mock.Anything).Return(nil) + c.newBackupStore = func(*api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) { + return backupStore, nil + } + if test.backupStoreError == nil { for _, itm := range test.informerLocations { sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(itm) } @@ -123,19 +126,23 @@ func TestFetchBackupInfo(t *testing.T) { } } - if test.backupServiceBackup != nil || test.backupServiceError != nil { - c.getBackup = func(_ cloudprovider.ObjectStore, bucket, backup string) (*api.Backup, error) { - require.Equal(t, "bucket", bucket) - require.Equal(t, test.backupName, backup) - return test.backupServiceBackup, test.backupServiceError - } + if test.backupStoreBackup != nil && test.backupStoreError != nil { + panic("developer error - only one of backupStoreBackup, backupStoreError can be non-nil") + } + + if test.backupStoreError != nil { + // TODO why do I need .Maybe() here? + backupStore.On("GetBackupMetadata", test.backupName).Return(nil, test.backupStoreError).Maybe() + } + if test.backupStoreBackup != nil { + // TODO why do I need .Maybe() here? + backupStore.On("GetBackupMetadata", test.backupName).Return(test.backupStoreBackup, nil).Maybe() } info, err := c.fetchBackupInfo(test.backupName, pluginManager) - if assert.Equal(t, test.expectedErr, err != nil) { - assert.Equal(t, test.expectedRes, info.backup) - } + require.Equal(t, test.expectedErr, err != nil) + assert.Equal(t, test.expectedRes, info.backup) }) } } @@ -180,11 +187,7 @@ func TestProcessRestoreSkips(t *testing.T) { restorer = &fakeRestorer{} sharedInformers = informers.NewSharedInformerFactory(client, 0) logger = arktest.NewLogger() - pluginManager = &pluginmocks.Manager{} - objectStore = &arktest.ObjectStore{} ) - defer restorer.AssertExpectations(t) - defer objectStore.AssertExpectations(t) c := NewRestoreController( api.DefaultNamespace, @@ -197,7 +200,7 @@ func TestProcessRestoreSkips(t *testing.T) { false, // pvProviderExists logger, logrus.InfoLevel, - func(logrus.FieldLogger) plugin.Manager { return pluginManager }, + nil, "default", metrics.NewServerMetrics(), ).(*restoreController) @@ -207,6 +210,7 @@ func TestProcessRestoreSkips(t *testing.T) { } err := c.processRestore(test.restoreKey) + assert.Equal(t, test.expectError, err != nil) }) } @@ -214,22 +218,22 @@ func TestProcessRestoreSkips(t *testing.T) { func TestProcessRestore(t *testing.T) { tests := []struct { - name string - restoreKey string - location *api.BackupStorageLocation - restore *api.Restore - backup *api.Backup - restorerError error - allowRestoreSnapshots bool - expectedErr bool - expectedPhase string - expectedValidationErrors []string - expectedRestoreErrors int - expectedRestorerCall *api.Restore - backupServiceGetBackupError error - uploadLogError error - backupServiceDownloadBackupError error - expectedFinalPhase string + name string + restoreKey string + location *api.BackupStorageLocation + restore *api.Restore + backup *api.Backup + restorerError error + allowRestoreSnapshots bool + expectedErr bool + expectedPhase string + expectedValidationErrors []string + expectedRestoreErrors int + expectedRestorerCall *api.Restore + backupStoreGetBackupMetadataErr error + backupStoreGetBackupContentsErr error + putRestoreLogErr error + expectedFinalPhase string }{ { name: "restore with both namespace in both includedNamespaces and excludedNamespaces fails validation", @@ -279,12 +283,12 @@ func TestProcessRestore(t *testing.T) { expectedRestorerCall: NewRestore("foo", "bar", "backup-1", "ns-1", "", api.RestorePhaseInProgress).WithSchedule("sched-1").Restore, }, { - name: "restore with non-existent backup name fails", - restore: NewRestore("foo", "bar", "backup-1", "ns-1", "*", api.RestorePhaseNew).Restore, - expectedErr: false, - expectedPhase: string(api.RestorePhaseFailedValidation), - expectedValidationErrors: []string{"Error retrieving backup: not able to fetch from backup storage"}, - backupServiceGetBackupError: errors.New("no backup here"), + name: "restore with non-existent backup name fails", + restore: NewRestore("foo", "bar", "backup-1", "ns-1", "*", api.RestorePhaseNew).Restore, + expectedErr: false, + expectedPhase: string(api.RestorePhaseFailedValidation), + expectedValidationErrors: []string{"Error retrieving backup: not able to fetch from backup storage"}, + backupStoreGetBackupMetadataErr: errors.New("no backup here"), }, { name: "restorer throwing an error causes the restore to fail", @@ -386,12 +390,12 @@ func TestProcessRestore(t *testing.T) { }, }, { - name: "backup download error results in failed restore", - location: arktest.NewTestBackupStorageLocation().WithName("default").WithProvider("myCloud").WithObjectStorage("bucket").BackupStorageLocation, - restore: NewRestore(api.DefaultNamespace, "bar", "backup-1", "ns-1", "", api.RestorePhaseNew).Restore, - expectedPhase: string(api.RestorePhaseInProgress), - expectedFinalPhase: string(api.RestorePhaseFailed), - backupServiceDownloadBackupError: errors.New("Couldn't download backup"), + name: "backup download error results in failed restore", + location: arktest.NewTestBackupStorageLocation().WithName("default").WithProvider("myCloud").WithObjectStorage("bucket").BackupStorageLocation, + restore: NewRestore(api.DefaultNamespace, "bar", "backup-1", "ns-1", "", api.RestorePhaseNew).Restore, + expectedPhase: string(api.RestorePhaseInProgress), + expectedFinalPhase: string(api.RestorePhaseFailed), + backupStoreGetBackupContentsErr: errors.New("Couldn't download backup"), backup: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup, }, } @@ -404,11 +408,11 @@ func TestProcessRestore(t *testing.T) { sharedInformers = informers.NewSharedInformerFactory(client, 0) logger = arktest.NewLogger() pluginManager = &pluginmocks.Manager{} - objectStore = &arktest.ObjectStore{} + backupStore = &persistencemocks.BackupStore{} ) - defer restorer.AssertExpectations(t) - defer objectStore.AssertExpectations(t) + defer restorer.AssertExpectations(t) + defer backupStore.AssertExpectations(t) c := NewRestoreController( api.DefaultNamespace, @@ -426,13 +430,15 @@ func TestProcessRestore(t *testing.T) { metrics.NewServerMetrics(), ).(*restoreController) + c.newBackupStore = func(*api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) { + return backupStore, nil + } + if test.location != nil { sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(test.location) } if test.backup != nil { sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup) - pluginManager.On("GetObjectStore", "myCloud").Return(objectStore, nil) - objectStore.On("Init", mock.Anything).Return(nil) } if test.restore != nil { @@ -481,28 +487,17 @@ func TestProcessRestore(t *testing.T) { if test.restorerError != nil { errors.Namespaces = map[string][]string{"ns-1": {test.restorerError.Error()}} } - if test.uploadLogError != nil { - errors.Ark = append(errors.Ark, "error uploading log file to object storage: "+test.uploadLogError.Error()) + if test.putRestoreLogErr != nil { + errors.Ark = append(errors.Ark, "error uploading log file to object storage: "+test.putRestoreLogErr.Error()) } if test.expectedRestorerCall != nil { - c.downloadBackup = func(objectStore cloudprovider.ObjectStore, bucket, backup string) (io.ReadCloser, error) { - require.Equal(t, test.backup.Name, backup) - return ioutil.NopCloser(bytes.NewReader([]byte("hello world"))), nil - } + backupStore.On("GetBackupContents", test.backup.Name).Return(ioutil.NopCloser(bytes.NewReader([]byte("hello world"))), nil) restorer.On("Restore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(warnings, errors) - c.uploadRestoreLog = func(objectStore cloudprovider.ObjectStore, bucket, backup, restore string, log io.Reader) error { - require.Equal(t, test.backup.Name, backup) - require.Equal(t, test.restore.Name, restore) - return test.uploadLogError - } + backupStore.On("PutRestoreLog", test.backup.Name, test.restore.Name, mock.Anything).Return(test.putRestoreLogErr) - c.uploadRestoreResults = func(objectStore cloudprovider.ObjectStore, bucket, backup, restore string, results io.Reader) error { - require.Equal(t, test.backup.Name, backup) - require.Equal(t, test.restore.Name, restore) - return nil - } + backupStore.On("PutRestoreResults", test.backup.Name, test.restore.Name, mock.Anything).Return(nil) } var ( @@ -516,20 +511,14 @@ func TestProcessRestore(t *testing.T) { } } - if test.backupServiceGetBackupError != nil { - c.getBackup = func(_ cloudprovider.ObjectStore, bucket, backup string) (*api.Backup, error) { - require.Equal(t, "bucket", bucket) - require.Equal(t, test.restore.Spec.BackupName, backup) - return nil, test.backupServiceGetBackupError - } + if test.backupStoreGetBackupMetadataErr != nil { + // TODO why do I need .Maybe() here? + backupStore.On("GetBackupMetadata", test.restore.Spec.BackupName).Return(nil, test.backupStoreGetBackupMetadataErr).Maybe() } - if test.backupServiceDownloadBackupError != nil { - c.downloadBackup = func(_ cloudprovider.ObjectStore, bucket, backupName string) (io.ReadCloser, error) { - require.Equal(t, "bucket", bucket) - require.Equal(t, test.restore.Spec.BackupName, backupName) - return nil, test.backupServiceDownloadBackupError - } + if test.backupStoreGetBackupContentsErr != nil { + // TODO why do I need .Maybe() here? + backupStore.On("GetBackupContents", test.restore.Spec.BackupName).Return(nil, test.backupStoreGetBackupContentsErr).Maybe() } if test.restore != nil { diff --git a/pkg/persistence/mocks/backup_store.go b/pkg/persistence/mocks/backup_store.go new file mode 100644 index 000000000..d0850f890 --- /dev/null +++ b/pkg/persistence/mocks/backup_store.go @@ -0,0 +1,158 @@ +// Code generated by mockery v1.0.0 +package mocks + +import io "io" +import mock "github.com/stretchr/testify/mock" + +import v1 "github.com/heptio/ark/pkg/apis/ark/v1" + +// BackupStore is an autogenerated mock type for the BackupStore type +type BackupStore struct { + mock.Mock +} + +// DeleteBackup provides a mock function with given fields: name +func (_m *BackupStore) DeleteBackup(name string) error { + ret := _m.Called(name) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(name) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetBackupContents provides a mock function with given fields: name +func (_m *BackupStore) GetBackupContents(name string) (io.ReadCloser, error) { + ret := _m.Called(name) + + var r0 io.ReadCloser + if rf, ok := ret.Get(0).(func(string) io.ReadCloser); ok { + r0 = rf(name) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(io.ReadCloser) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(name) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetBackupMetadata provides a mock function with given fields: name +func (_m *BackupStore) GetBackupMetadata(name string) (*v1.Backup, error) { + ret := _m.Called(name) + + var r0 *v1.Backup + if rf, ok := ret.Get(0).(func(string) *v1.Backup); ok { + r0 = rf(name) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v1.Backup) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(name) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetDownloadURL provides a mock function with given fields: backup, target +func (_m *BackupStore) GetDownloadURL(backup string, target v1.DownloadTarget) (string, error) { + ret := _m.Called(backup, target) + + var r0 string + if rf, ok := ret.Get(0).(func(string, v1.DownloadTarget) string); ok { + r0 = rf(backup, target) + } else { + r0 = ret.Get(0).(string) + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, v1.DownloadTarget) error); ok { + r1 = rf(backup, target) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ListBackups provides a mock function with given fields: +func (_m *BackupStore) ListBackups() ([]*v1.Backup, error) { + ret := _m.Called() + + var r0 []*v1.Backup + if rf, ok := ret.Get(0).(func() []*v1.Backup); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*v1.Backup) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// PutBackup provides a mock function with given fields: name, metadata, contents, log +func (_m *BackupStore) PutBackup(name string, metadata io.Reader, contents io.Reader, log io.Reader) error { + ret := _m.Called(name, metadata, contents, log) + + var r0 error + if rf, ok := ret.Get(0).(func(string, io.Reader, io.Reader, io.Reader) error); ok { + r0 = rf(name, metadata, contents, log) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// PutRestoreLog provides a mock function with given fields: backup, restore, log +func (_m *BackupStore) PutRestoreLog(backup string, restore string, log io.Reader) error { + ret := _m.Called(backup, restore, log) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string, io.Reader) error); ok { + r0 = rf(backup, restore, log) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// PutRestoreResults provides a mock function with given fields: backup, restore, results +func (_m *BackupStore) PutRestoreResults(backup string, restore string, results io.Reader) error { + ret := _m.Called(backup, restore, results) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string, io.Reader) error); ok { + r0 = rf(backup, restore, results) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/pkg/persistence/object_storage_persistence.go b/pkg/persistence/object_storage_persistence.go deleted file mode 100644 index 151ceeb62..000000000 --- a/pkg/persistence/object_storage_persistence.go +++ /dev/null @@ -1,267 +0,0 @@ -/* -Copyright 2017 the Heptio Ark contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package persistence - -import ( - "fmt" - "io" - "io/ioutil" - "time" - - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - - kerrors "k8s.io/apimachinery/pkg/util/errors" - - api "github.com/heptio/ark/pkg/apis/ark/v1" - "github.com/heptio/ark/pkg/cloudprovider" - "github.com/heptio/ark/pkg/generated/clientset/versioned/scheme" -) - -// BackupLister knows how to list backups in object storage. -type BackupLister interface { - // ListBackups lists all the api.Backups in object storage for the given bucket. - ListBackups(bucket string) ([]*api.Backup, error) -} - -const ( - metadataFileFormatString = "%s/ark-backup.json" - backupFileFormatString = "%s/%s.tar.gz" - backupLogFileFormatString = "%s/%s-logs.gz" - restoreLogFileFormatString = "%s/restore-%s-logs.gz" - restoreResultsFileFormatString = "%s/restore-%s-results.gz" -) - -func getMetadataKey(directory string) string { - return fmt.Sprintf(metadataFileFormatString, directory) -} - -func getBackupContentsKey(directory, backup string) string { - return fmt.Sprintf(backupFileFormatString, directory, backup) -} - -func getBackupLogKey(directory, backup string) string { - return fmt.Sprintf(backupLogFileFormatString, directory, backup) -} - -func getRestoreLogKey(directory, restore string) string { - return fmt.Sprintf(restoreLogFileFormatString, directory, restore) -} - -func getRestoreResultsKey(directory, restore string) string { - return fmt.Sprintf(restoreResultsFileFormatString, directory, restore) -} - -func seekToBeginning(r io.Reader) error { - seeker, ok := r.(io.Seeker) - if !ok { - return nil - } - - _, err := seeker.Seek(0, 0) - return err -} - -func seekAndPutObject(objectStore cloudprovider.ObjectStore, bucket, key string, file io.Reader) error { - if file == nil { - return nil - } - - if err := seekToBeginning(file); err != nil { - return errors.WithStack(err) - } - - return objectStore.PutObject(bucket, key, file) -} - -func UploadBackupLog(objectStore cloudprovider.ObjectStore, bucket, backupName string, log io.Reader) error { - logKey := getBackupLogKey(backupName, backupName) - return seekAndPutObject(objectStore, bucket, logKey, log) -} - -func UploadBackupMetadata(objectStore cloudprovider.ObjectStore, bucket, backupName string, metadata io.Reader) error { - metadataKey := getMetadataKey(backupName) - return seekAndPutObject(objectStore, bucket, metadataKey, metadata) -} - -func DeleteBackupMetadata(objectStore cloudprovider.ObjectStore, bucket, backupName string) error { - metadataKey := getMetadataKey(backupName) - return objectStore.DeleteObject(bucket, metadataKey) -} - -func UploadBackupData(objectStore cloudprovider.ObjectStore, bucket, backupName string, backup io.Reader) error { - backupKey := getBackupContentsKey(backupName, backupName) - return seekAndPutObject(objectStore, bucket, backupKey, backup) -} - -func UploadBackup(logger logrus.FieldLogger, objectStore cloudprovider.ObjectStore, bucket, backupName string, metadata, backup, log io.Reader) error { - if err := UploadBackupLog(objectStore, bucket, backupName, log); err != nil { - // Uploading the log file is best-effort; if it fails, we log the error but it doesn't impact the - // backup's status. - logger.WithError(err).WithField("bucket", bucket).Error("Error uploading log file") - } - - if metadata == nil { - // If we don't have metadata, something failed, and there's no point in continuing. An object - // storage bucket that is missing the metadata file can't be restored, nor can its logs be - // viewed. - return nil - } - - // upload metadata file - if err := UploadBackupMetadata(objectStore, bucket, backupName, metadata); err != nil { - // failure to upload metadata file is a hard-stop - return err - } - - // upload tar file - if err := UploadBackupData(objectStore, bucket, backupName, backup); err != nil { - // try to delete the metadata file since the data upload failed - deleteErr := DeleteBackupMetadata(objectStore, bucket, backupName) - return kerrors.NewAggregate([]error{err, deleteErr}) - } - - return nil -} - -// DownloadBackupFunc is a function that can download backup metadata from a bucket in object storage. -type DownloadBackupFunc func(objectStore cloudprovider.ObjectStore, bucket, backupName string) (io.ReadCloser, error) - -// DownloadBackup downloads an Ark backup with the specified object key from object storage via the cloud API. -// It returns the snapshot metadata and data (separately), or an error if a problem is encountered -// downloading or reading the file from the cloud API. -func DownloadBackup(objectStore cloudprovider.ObjectStore, bucket, backupName string) (io.ReadCloser, error) { - return objectStore.GetObject(bucket, getBackupContentsKey(backupName, backupName)) -} - -func ListBackups(logger logrus.FieldLogger, objectStore cloudprovider.ObjectStore, bucket string) ([]*api.Backup, error) { - prefixes, err := objectStore.ListCommonPrefixes(bucket, "/") - if err != nil { - return nil, err - } - if len(prefixes) == 0 { - return []*api.Backup{}, nil - } - - output := make([]*api.Backup, 0, len(prefixes)) - - for _, backupDir := range prefixes { - backup, err := GetBackup(objectStore, bucket, backupDir) - if err != nil { - logger.WithError(err).WithField("dir", backupDir).Error("Error reading backup directory") - continue - } - - output = append(output, backup) - } - - return output, nil -} - -//GetBackupFunc is a function that can retrieve backup metadata from an object store -type GetBackupFunc func(objectStore cloudprovider.ObjectStore, bucket, backupName string) (*api.Backup, error) - -// GetBackup gets the specified api.Backup from the given bucket in object storage. -func GetBackup(objectStore cloudprovider.ObjectStore, bucket, backupName string) (*api.Backup, error) { - key := getMetadataKey(backupName) - - res, err := objectStore.GetObject(bucket, key) - if err != nil { - return nil, err - } - defer res.Close() - - data, err := ioutil.ReadAll(res) - if err != nil { - return nil, errors.WithStack(err) - } - - decoder := scheme.Codecs.UniversalDecoder(api.SchemeGroupVersion) - obj, _, err := decoder.Decode(data, nil, nil) - if err != nil { - return nil, errors.WithStack(err) - } - - backup, ok := obj.(*api.Backup) - if !ok { - return nil, errors.Errorf("unexpected type for %s/%s: %T", bucket, key, obj) - } - - return backup, nil -} - -// DeleteBackupDirFunc is a function that can delete a backup directory from a bucket in object storage. -type DeleteBackupDirFunc func(logger logrus.FieldLogger, objectStore cloudprovider.ObjectStore, bucket, backupName string) error - -// DeleteBackupDir deletes all files in object storage for the given backup. -func DeleteBackupDir(logger logrus.FieldLogger, objectStore cloudprovider.ObjectStore, bucket, backupName string) error { - objects, err := objectStore.ListObjects(bucket, backupName+"/") - if err != nil { - return err - } - - var errs []error - for _, key := range objects { - logger.WithFields(logrus.Fields{ - "bucket": bucket, - "key": key, - }).Debug("Trying to delete object") - if err := objectStore.DeleteObject(bucket, key); err != nil { - errs = append(errs, err) - } - } - - return errors.WithStack(kerrors.NewAggregate(errs)) -} - -// CreateSignedURLFunc is a function that can create a signed URL for an object in object storage. -type CreateSignedURLFunc func(objectStore cloudprovider.ObjectStore, target api.DownloadTarget, bucket, directory string, ttl time.Duration) (string, error) - -// CreateSignedURL creates a pre-signed URL that can be used to download a file from object -// storage. The URL expires after ttl. -func CreateSignedURL(objectStore cloudprovider.ObjectStore, target api.DownloadTarget, bucket, directory string, ttl time.Duration) (string, error) { - switch target.Kind { - case api.DownloadTargetKindBackupContents: - return objectStore.CreateSignedURL(bucket, getBackupContentsKey(directory, target.Name), ttl) - case api.DownloadTargetKindBackupLog: - return objectStore.CreateSignedURL(bucket, getBackupLogKey(directory, target.Name), ttl) - case api.DownloadTargetKindRestoreLog: - return objectStore.CreateSignedURL(bucket, getRestoreLogKey(directory, target.Name), ttl) - case api.DownloadTargetKindRestoreResults: - return objectStore.CreateSignedURL(bucket, getRestoreResultsKey(directory, target.Name), ttl) - default: - return "", errors.Errorf("unsupported download target kind %q", target.Kind) - } -} - -// UploadRestoreLogFunc is a function that can upload a restore log to a bucket in object storage. -type UploadRestoreLogFunc func(objectStore cloudprovider.ObjectStore, bucket, backup, restore string, log io.Reader) error - -// UploadRestoreLog uploads the restore's log file to object storage. -func UploadRestoreLog(objectStore cloudprovider.ObjectStore, bucket, backup, restore string, log io.Reader) error { - key := getRestoreLogKey(backup, restore) - return objectStore.PutObject(bucket, key, log) -} - -// UploadRestoreResultsFunc is a function that can upload restore results to a bucket in object storage. -type UploadRestoreResultsFunc func(objectStore cloudprovider.ObjectStore, bucket, backup, restore string, results io.Reader) error - -// UploadRestoreResults uploads the restore's results file to object storage. -func UploadRestoreResults(objectStore cloudprovider.ObjectStore, bucket, backup, restore string, results io.Reader) error { - key := getRestoreResultsKey(backup, restore) - return objectStore.PutObject(bucket, key, results) -} diff --git a/pkg/persistence/object_storage_persistence_test.go b/pkg/persistence/object_storage_persistence_test.go deleted file mode 100644 index fc59cc3bd..000000000 --- a/pkg/persistence/object_storage_persistence_test.go +++ /dev/null @@ -1,366 +0,0 @@ -/* -Copyright 2017 the Heptio Ark contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package persistence - -import ( - "bytes" - "encoding/json" - "errors" - "io" - "io/ioutil" - "strings" - "testing" - "time" - - testutil "github.com/heptio/ark/pkg/util/test" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - - api "github.com/heptio/ark/pkg/apis/ark/v1" - "github.com/heptio/ark/pkg/util/encode" - arktest "github.com/heptio/ark/pkg/util/test" -) - -func TestUploadBackup(t *testing.T) { - tests := []struct { - name string - metadata io.ReadSeeker - metadataError error - expectMetadataDelete bool - backup io.ReadSeeker - backupError error - expectBackupUpload bool - log io.ReadSeeker - logError error - expectedErr string - }{ - { - name: "normal case", - metadata: newStringReadSeeker("foo"), - backup: newStringReadSeeker("bar"), - expectBackupUpload: true, - log: newStringReadSeeker("baz"), - }, - { - name: "error on metadata upload does not upload data", - metadata: newStringReadSeeker("foo"), - metadataError: errors.New("md"), - log: newStringReadSeeker("baz"), - expectedErr: "md", - }, - { - name: "error on data upload deletes metadata", - metadata: newStringReadSeeker("foo"), - backup: newStringReadSeeker("bar"), - expectBackupUpload: true, - backupError: errors.New("backup"), - expectMetadataDelete: true, - expectedErr: "backup", - }, - { - name: "error on log upload is ok", - metadata: newStringReadSeeker("foo"), - backup: newStringReadSeeker("bar"), - expectBackupUpload: true, - log: newStringReadSeeker("baz"), - logError: errors.New("log"), - }, - { - name: "don't upload data when metadata is nil", - backup: newStringReadSeeker("bar"), - log: newStringReadSeeker("baz"), - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - var ( - objectStore = &testutil.ObjectStore{} - bucket = "test-bucket" - backupName = "test-backup" - logger = arktest.NewLogger() - ) - defer objectStore.AssertExpectations(t) - - if test.metadata != nil { - objectStore.On("PutObject", bucket, backupName+"/ark-backup.json", test.metadata).Return(test.metadataError) - } - if test.backup != nil && test.expectBackupUpload { - objectStore.On("PutObject", bucket, backupName+"/"+backupName+".tar.gz", test.backup).Return(test.backupError) - } - if test.log != nil { - objectStore.On("PutObject", bucket, backupName+"/"+backupName+"-logs.gz", test.log).Return(test.logError) - } - if test.expectMetadataDelete { - objectStore.On("DeleteObject", bucket, backupName+"/ark-backup.json").Return(nil) - } - - err := UploadBackup(logger, objectStore, bucket, backupName, test.metadata, test.backup, test.log) - - if test.expectedErr != "" { - assert.EqualError(t, err, test.expectedErr) - } else { - assert.NoError(t, err) - } - - }) - } -} - -func TestDownloadBackup(t *testing.T) { - var ( - objectStore = &testutil.ObjectStore{} - bucket = "b" - backup = "bak" - ) - objectStore.On("GetObject", bucket, backup+"/"+backup+".tar.gz").Return(ioutil.NopCloser(strings.NewReader("foo")), nil) - - rc, err := DownloadBackup(objectStore, bucket, backup) - require.NoError(t, err) - require.NotNil(t, rc) - data, err := ioutil.ReadAll(rc) - require.NoError(t, err) - assert.Equal(t, "foo", string(data)) - objectStore.AssertExpectations(t) -} - -func TestDeleteBackup(t *testing.T) { - tests := []struct { - name string - listObjectsError error - deleteErrors []error - expectedErr string - }{ - { - name: "normal case", - }, - { - name: "some delete errors, do as much as we can", - deleteErrors: []error{errors.New("a"), nil, errors.New("c")}, - expectedErr: "[a, c]", - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - var ( - bucket = "bucket" - backup = "bak" - objects = []string{"bak/ark-backup.json", "bak/bak.tar.gz", "bak/bak.log.gz"} - objectStore = &testutil.ObjectStore{} - logger = arktest.NewLogger() - ) - - objectStore.On("ListObjects", bucket, backup+"/").Return(objects, test.listObjectsError) - for i, obj := range objects { - var err error - if i < len(test.deleteErrors) { - err = test.deleteErrors[i] - } - - objectStore.On("DeleteObject", bucket, obj).Return(err) - } - - err := DeleteBackupDir(logger, objectStore, bucket, backup) - - if test.expectedErr != "" { - assert.EqualError(t, err, test.expectedErr) - } else { - assert.NoError(t, err) - } - - objectStore.AssertExpectations(t) - }) - } -} - -func TestGetAllBackups(t *testing.T) { - tests := []struct { - name string - storageData map[string][]byte - expectedRes []*api.Backup - expectedErr string - }{ - { - name: "normal case", - storageData: map[string][]byte{ - "backup-1/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}}), - "backup-2/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-2"}}), - }, - expectedRes: []*api.Backup{ - { - TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"}, - ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}, - }, - { - TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"}, - ObjectMeta: metav1.ObjectMeta{Name: "backup-2"}, - }, - }, - }, - { - name: "backup that can't be decoded is ignored", - storageData: map[string][]byte{ - "backup-1/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}}), - "backup-2/ark-backup.json": []byte("this is not valid backup JSON"), - }, - expectedRes: []*api.Backup{ - { - TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"}, - ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}, - }, - }, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - var ( - bucket = "bucket" - objectStore = &testutil.ObjectStore{} - logger = arktest.NewLogger() - ) - - objectStore.On("ListCommonPrefixes", bucket, "/").Return([]string{"backup-1", "backup-2"}, nil) - objectStore.On("GetObject", bucket, "backup-1/ark-backup.json").Return(ioutil.NopCloser(bytes.NewReader(test.storageData["backup-1/ark-backup.json"])), nil) - objectStore.On("GetObject", bucket, "backup-2/ark-backup.json").Return(ioutil.NopCloser(bytes.NewReader(test.storageData["backup-2/ark-backup.json"])), nil) - - res, err := ListBackups(logger, objectStore, bucket) - - if test.expectedErr != "" { - assert.EqualError(t, err, test.expectedErr) - } else { - assert.NoError(t, err) - } - - assert.Equal(t, test.expectedRes, res) - - objectStore.AssertExpectations(t) - }) - } -} - -func TestCreateSignedURL(t *testing.T) { - tests := []struct { - name string - targetKind api.DownloadTargetKind - targetName string - directory string - expectedKey string - }{ - { - name: "backup contents", - targetKind: api.DownloadTargetKindBackupContents, - targetName: "my-backup", - directory: "my-backup", - expectedKey: "my-backup/my-backup.tar.gz", - }, - { - name: "backup log", - targetKind: api.DownloadTargetKindBackupLog, - targetName: "my-backup", - directory: "my-backup", - expectedKey: "my-backup/my-backup-logs.gz", - }, - { - name: "scheduled backup contents", - targetKind: api.DownloadTargetKindBackupContents, - targetName: "my-backup-20170913154901", - directory: "my-backup-20170913154901", - expectedKey: "my-backup-20170913154901/my-backup-20170913154901.tar.gz", - }, - { - name: "scheduled backup log", - targetKind: api.DownloadTargetKindBackupLog, - targetName: "my-backup-20170913154901", - directory: "my-backup-20170913154901", - expectedKey: "my-backup-20170913154901/my-backup-20170913154901-logs.gz", - }, - { - name: "restore log", - targetKind: api.DownloadTargetKindRestoreLog, - targetName: "b-20170913154901", - directory: "b", - expectedKey: "b/restore-b-20170913154901-logs.gz", - }, - { - name: "restore results", - targetKind: api.DownloadTargetKindRestoreResults, - targetName: "b-20170913154901", - directory: "b", - expectedKey: "b/restore-b-20170913154901-results.gz", - }, - { - name: "restore results - backup has multiple dashes (e.g. restore of scheduled backup)", - targetKind: api.DownloadTargetKindRestoreResults, - targetName: "b-cool-20170913154901-20170913154902", - directory: "b-cool-20170913154901", - expectedKey: "b-cool-20170913154901/restore-b-cool-20170913154901-20170913154902-results.gz", - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - var ( - objectStore = &testutil.ObjectStore{} - ) - defer objectStore.AssertExpectations(t) - - target := api.DownloadTarget{ - Kind: test.targetKind, - Name: test.targetName, - } - objectStore.On("CreateSignedURL", "bucket", test.expectedKey, time.Duration(0)).Return("url", nil) - url, err := CreateSignedURL(objectStore, target, "bucket", test.directory, 0) - require.NoError(t, err) - assert.Equal(t, "url", url) - }) - } -} - -func jsonMarshal(obj interface{}) []byte { - res, err := json.Marshal(obj) - if err != nil { - panic(err) - } - return res -} - -func encodeToBytes(obj runtime.Object) []byte { - res, err := encode.Encode(obj, "json") - if err != nil { - panic(err) - } - return res -} - -type stringReadSeeker struct { - *strings.Reader -} - -func newStringReadSeeker(s string) *stringReadSeeker { - return &stringReadSeeker{ - Reader: strings.NewReader(s), - } -} - -func (srs *stringReadSeeker) Seek(offset int64, whence int) (int64, error) { - return 0, nil -} diff --git a/pkg/persistence/object_store.go b/pkg/persistence/object_store.go new file mode 100644 index 000000000..c2fd1be68 --- /dev/null +++ b/pkg/persistence/object_store.go @@ -0,0 +1,299 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package persistence + +import ( + "fmt" + "io" + "io/ioutil" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + kerrors "k8s.io/apimachinery/pkg/util/errors" + + arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" + "github.com/heptio/ark/pkg/cloudprovider" + "github.com/heptio/ark/pkg/generated/clientset/versioned/scheme" +) + +// BackupStore defines operations for creating, retrieving, and deleting +// Ark backup and restore data in/from a persistent backup store. +type BackupStore interface { + ListBackups() ([]*arkv1api.Backup, error) + + PutBackup(name string, metadata, contents, log io.Reader) error + GetBackupMetadata(name string) (*arkv1api.Backup, error) + GetBackupContents(name string) (io.ReadCloser, error) + DeleteBackup(name string) error + + PutRestoreLog(backup, restore string, log io.Reader) error + PutRestoreResults(backup, restore string, results io.Reader) error + + GetDownloadURL(backup string, target arkv1api.DownloadTarget) (string, error) +} + +const ( + // DownloadURLTTL is how long a download URL is valid for. + DownloadURLTTL = 10 * time.Minute + + backupMetadataFileFormatString = "%s/ark-backup.json" + backupFileFormatString = "%s/%s.tar.gz" + backupLogFileFormatString = "%s/%s-logs.gz" + restoreLogFileFormatString = "%s/restore-%s-logs.gz" + restoreResultsFileFormatString = "%s/restore-%s-results.gz" +) + +func getPrefix(prefix string) string { + if prefix == "" || strings.HasSuffix(prefix, "/") { + return prefix + } + + return prefix + "/" +} + +func getBackupMetadataKey(prefix, backup string) string { + return prefix + fmt.Sprintf(backupMetadataFileFormatString, backup) +} + +func getBackupContentsKey(prefix, backup string) string { + return prefix + fmt.Sprintf(backupFileFormatString, backup, backup) +} + +func getBackupLogKey(prefix, backup string) string { + return prefix + fmt.Sprintf(backupLogFileFormatString, backup, backup) +} + +func getRestoreLogKey(prefix, backup, restore string) string { + return prefix + fmt.Sprintf(restoreLogFileFormatString, backup, restore) +} + +func getRestoreResultsKey(prefix, backup, restore string) string { + return prefix + fmt.Sprintf(restoreResultsFileFormatString, backup, restore) +} + +type objectBackupStore struct { + objectStore cloudprovider.ObjectStore + bucket string + prefix string + logger logrus.FieldLogger +} + +// ObjectStoreGetter is a type that can get a cloudprovider.ObjectStore +// from a provider name. +type ObjectStoreGetter interface { + GetObjectStore(provider string) (cloudprovider.ObjectStore, error) +} + +func NewObjectBackupStore(location *arkv1api.BackupStorageLocation, objectStoreGetter ObjectStoreGetter, logger logrus.FieldLogger) (BackupStore, error) { + if location.Spec.ObjectStorage == nil { + return nil, errors.New("backup storage location does not use object storage") + } + + if location.Spec.Provider == "" { + return nil, errors.New("object storage provider name must not be empty") + } + + objectStore, err := objectStoreGetter.GetObjectStore(location.Spec.Provider) + if err != nil { + return nil, err + } + + // add the bucket name to the config map so that object stores can use + // it when initializing. The AWS object store uses this to determine the + // bucket's region when setting up its client. + if location.Spec.ObjectStorage != nil { + if location.Spec.Config == nil { + location.Spec.Config = make(map[string]string) + } + location.Spec.Config["bucket"] = location.Spec.ObjectStorage.Bucket + } + + if err := objectStore.Init(location.Spec.Config); err != nil { + return nil, err + } + + prefix := getPrefix(location.Spec.ObjectStorage.Prefix) + + log := logger.WithFields(logrus.Fields(map[string]interface{}{ + "bucket": location.Spec.ObjectStorage.Bucket, + "prefix": prefix, + })) + + return &objectBackupStore{ + objectStore: objectStore, + bucket: location.Spec.ObjectStorage.Bucket, + prefix: prefix, + logger: log, + }, nil +} + +func (s *objectBackupStore) ListBackups() ([]*arkv1api.Backup, error) { + prefixes, err := s.objectStore.ListCommonPrefixes(s.bucket, s.prefix, "/") + if err != nil { + return nil, err + } + if len(prefixes) == 0 { + return []*arkv1api.Backup{}, nil + } + + output := make([]*arkv1api.Backup, 0, len(prefixes)) + + for _, prefix := range prefixes { + // values returned from a call to cloudprovider.ObjectStore's + // ListcommonPrefixes method return the *full* prefix, inclusive + // of s.prefix, and include the delimiter ("/") as a suffix. Trim + // each of those off to get the backup name. + backupName := strings.TrimSuffix(strings.TrimPrefix(prefix, s.prefix), "/") + + backup, err := s.GetBackupMetadata(backupName) + if err != nil { + s.logger.WithError(err).WithField("dir", backupName).Error("Error reading backup directory") + continue + } + + output = append(output, backup) + } + + return output, nil +} + +func (s *objectBackupStore) PutBackup(name string, metadata io.Reader, contents io.Reader, log io.Reader) error { + if err := seekAndPutObject(s.objectStore, s.bucket, getBackupLogKey(s.prefix, name), log); err != nil { + // Uploading the log file is best-effort; if it fails, we log the error but it doesn't impact the + // backup's status. + s.logger.WithError(err).WithField("backup", name).Error("Error uploading log file") + } + + if metadata == nil { + // If we don't have metadata, something failed, and there's no point in continuing. An object + // storage bucket that is missing the metadata file can't be restored, nor can its logs be + // viewed. + return nil + } + + if err := seekAndPutObject(s.objectStore, s.bucket, getBackupMetadataKey(s.prefix, name), metadata); err != nil { + // failure to upload metadata file is a hard-stop + return err + } + + if err := seekAndPutObject(s.objectStore, s.bucket, getBackupContentsKey(s.prefix, name), contents); err != nil { + deleteErr := s.objectStore.DeleteObject(s.bucket, getBackupMetadataKey(s.prefix, name)) + return kerrors.NewAggregate([]error{err, deleteErr}) + } + + return nil +} + +func (s *objectBackupStore) GetBackupMetadata(name string) (*arkv1api.Backup, error) { + key := getBackupMetadataKey(s.prefix, name) + + res, err := s.objectStore.GetObject(s.bucket, key) + if err != nil { + return nil, err + } + defer res.Close() + + data, err := ioutil.ReadAll(res) + if err != nil { + return nil, errors.WithStack(err) + } + + decoder := scheme.Codecs.UniversalDecoder(arkv1api.SchemeGroupVersion) + obj, _, err := decoder.Decode(data, nil, nil) + if err != nil { + return nil, errors.WithStack(err) + } + + backupObj, ok := obj.(*arkv1api.Backup) + if !ok { + return nil, errors.Errorf("unexpected type for %s/%s: %T", s.bucket, key, obj) + } + + return backupObj, nil + +} + +func (s *objectBackupStore) GetBackupContents(name string) (io.ReadCloser, error) { + return s.objectStore.GetObject(s.bucket, getBackupContentsKey(s.prefix, name)) +} + +func (s *objectBackupStore) DeleteBackup(name string) error { + objects, err := s.objectStore.ListObjects(s.bucket, s.prefix+name+"/") + if err != nil { + return err + } + + var errs []error + for _, key := range objects { + s.logger.WithFields(logrus.Fields{ + "key": key, + }).Debug("Trying to delete object") + if err := s.objectStore.DeleteObject(s.bucket, key); err != nil { + errs = append(errs, err) + } + } + + return errors.WithStack(kerrors.NewAggregate(errs)) +} + +func (s *objectBackupStore) PutRestoreLog(backup string, restore string, log io.Reader) error { + return s.objectStore.PutObject(s.bucket, getRestoreLogKey(s.prefix, backup, restore), log) +} + +func (s *objectBackupStore) PutRestoreResults(backup string, restore string, results io.Reader) error { + return s.objectStore.PutObject(s.bucket, getRestoreResultsKey(s.prefix, backup, restore), results) +} + +func (s *objectBackupStore) GetDownloadURL(backup string, target arkv1api.DownloadTarget) (string, error) { + switch target.Kind { + case arkv1api.DownloadTargetKindBackupContents: + return s.objectStore.CreateSignedURL(s.bucket, getBackupContentsKey(s.prefix, backup), DownloadURLTTL) + case arkv1api.DownloadTargetKindBackupLog: + return s.objectStore.CreateSignedURL(s.bucket, getBackupLogKey(s.prefix, backup), DownloadURLTTL) + case arkv1api.DownloadTargetKindRestoreLog: + return s.objectStore.CreateSignedURL(s.bucket, getRestoreLogKey(s.prefix, backup, target.Name), DownloadURLTTL) + case arkv1api.DownloadTargetKindRestoreResults: + return s.objectStore.CreateSignedURL(s.bucket, getRestoreResultsKey(s.prefix, backup, target.Name), DownloadURLTTL) + default: + return "", errors.Errorf("unsupported download target kind %q", target.Kind) + } +} + +func seekToBeginning(r io.Reader) error { + seeker, ok := r.(io.Seeker) + if !ok { + return nil + } + + _, err := seeker.Seek(0, 0) + return err +} + +func seekAndPutObject(objectStore cloudprovider.ObjectStore, bucket, key string, file io.Reader) error { + if file == nil { + return nil + } + + if err := seekToBeginning(file); err != nil { + return errors.WithStack(err) + } + + return objectStore.PutObject(bucket, key, file) +} diff --git a/pkg/persistence/object_store_test.go b/pkg/persistence/object_store_test.go new file mode 100644 index 000000000..6f2e27cbf --- /dev/null +++ b/pkg/persistence/object_store_test.go @@ -0,0 +1,403 @@ +/* +Copyright 2017 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package persistence + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "sort" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + api "github.com/heptio/ark/pkg/apis/ark/v1" + "github.com/heptio/ark/pkg/cloudprovider" + "github.com/heptio/ark/pkg/util/encode" + arktest "github.com/heptio/ark/pkg/util/test" +) + +type objectBackupStoreTestHarness struct { + // embedded to reduce verbosity when calling methods + *objectBackupStore + + objectStore *cloudprovider.InMemoryObjectStore + bucket, prefix string +} + +func newObjectBackupStoreTestHarness(bucket, prefix string) *objectBackupStoreTestHarness { + objectStore := cloudprovider.NewInMemoryObjectStore(bucket) + + return &objectBackupStoreTestHarness{ + objectBackupStore: &objectBackupStore{ + objectStore: objectStore, + bucket: bucket, + prefix: prefix, + logger: arktest.NewLogger(), + }, + objectStore: objectStore, + bucket: bucket, + prefix: prefix, + } +} + +func TestListBackups(t *testing.T) { + tests := []struct { + name string + prefix string + storageData cloudprovider.BucketData + expectedRes []*api.Backup + expectedErr string + }{ + { + name: "normal case", + storageData: map[string][]byte{ + "backup-1/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}}), + "backup-2/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-2"}}), + }, + expectedRes: []*api.Backup{ + { + TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}, + }, + { + TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "backup-2"}, + }, + }, + }, + { + name: "normal case with backup store prefix", + prefix: "ark-backups/", + storageData: map[string][]byte{ + "ark-backups/backup-1/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}}), + "ark-backups/backup-2/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-2"}}), + }, + expectedRes: []*api.Backup{ + { + TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}, + }, + { + TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "backup-2"}, + }, + }, + }, + { + name: "backup that can't be decoded is ignored", + storageData: map[string][]byte{ + "backup-1/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}}), + "backup-2/ark-backup.json": []byte("this is not valid backup JSON"), + }, + expectedRes: []*api.Backup{ + { + TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}, + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + harness := newObjectBackupStoreTestHarness("foo", tc.prefix) + + for key, obj := range tc.storageData { + require.NoError(t, harness.objectStore.PutObject(harness.bucket, key, bytes.NewReader(obj))) + } + + res, err := harness.ListBackups() + + arktest.AssertErrorMatches(t, tc.expectedErr, err) + + getComparer := func(obj []*api.Backup) func(i, j int) bool { + return func(i, j int) bool { + switch strings.Compare(obj[i].Namespace, obj[j].Namespace) { + case -1: + return true + case 1: + return false + default: + // namespaces are the same: compare by name + return obj[i].Name < obj[j].Name + } + } + } + + sort.Slice(tc.expectedRes, getComparer(tc.expectedRes)) + sort.Slice(res, getComparer(res)) + + assert.Equal(t, tc.expectedRes, res) + }) + } +} + +func TestPutBackup(t *testing.T) { + tests := []struct { + name string + prefix string + metadata io.Reader + contents io.Reader + log io.Reader + expectedErr string + expectedKeys []string + }{ + { + name: "normal case", + metadata: newStringReadSeeker("metadata"), + contents: newStringReadSeeker("contents"), + log: newStringReadSeeker("log"), + expectedErr: "", + expectedKeys: []string{"backup-1/ark-backup.json", "backup-1/backup-1.tar.gz", "backup-1/backup-1-logs.gz"}, + }, + { + name: "normal case with backup store prefix", + prefix: "prefix-1/", + metadata: newStringReadSeeker("metadata"), + contents: newStringReadSeeker("contents"), + log: newStringReadSeeker("log"), + expectedErr: "", + expectedKeys: []string{"prefix-1/backup-1/ark-backup.json", "prefix-1/backup-1/backup-1.tar.gz", "prefix-1/backup-1/backup-1-logs.gz"}, + }, + { + name: "error on metadata upload does not upload data", + metadata: new(errorReader), + contents: newStringReadSeeker("contents"), + log: newStringReadSeeker("log"), + expectedErr: "error readers return errors", + expectedKeys: []string{"backup-1/backup-1-logs.gz"}, + }, + { + name: "error on data upload deletes metadata", + metadata: newStringReadSeeker("metadata"), + contents: new(errorReader), + log: newStringReadSeeker("log"), + expectedErr: "error readers return errors", + expectedKeys: []string{"backup-1/backup-1-logs.gz"}, + }, + { + name: "error on log upload is ok", + metadata: newStringReadSeeker("foo"), + contents: newStringReadSeeker("bar"), + log: new(errorReader), + expectedErr: "", + expectedKeys: []string{"backup-1/ark-backup.json", "backup-1/backup-1.tar.gz"}, + }, + { + name: "don't upload data when metadata is nil", + metadata: nil, + contents: newStringReadSeeker("contents"), + log: newStringReadSeeker("log"), + expectedErr: "", + expectedKeys: []string{"backup-1/backup-1-logs.gz"}, + }, + } + + for _, tc := range tests { + harness := newObjectBackupStoreTestHarness("foo", tc.prefix) + + err := harness.PutBackup("backup-1", tc.metadata, tc.contents, tc.log) + + arktest.AssertErrorMatches(t, tc.expectedErr, err) + assert.Len(t, harness.objectStore.Data[harness.bucket], len(tc.expectedKeys)) + for _, key := range tc.expectedKeys { + assert.Contains(t, harness.objectStore.Data[harness.bucket], key) + } + } +} + +func TestGetBackupContents(t *testing.T) { + harness := newObjectBackupStoreTestHarness("test-bucket", "") + + harness.objectStore.PutObject(harness.bucket, "test-backup/test-backup.tar.gz", newStringReadSeeker("foo")) + + rc, err := harness.GetBackupContents("test-backup") + require.NoError(t, err) + require.NotNil(t, rc) + + data, err := ioutil.ReadAll(rc) + require.NoError(t, err) + assert.Equal(t, "foo", string(data)) +} + +func TestDeleteBackup(t *testing.T) { + tests := []struct { + name string + prefix string + listObjectsError error + deleteErrors []error + expectedErr string + }{ + { + name: "normal case", + }, + { + name: "normal case with backup store prefix", + prefix: "ark-backups/", + }, + { + name: "some delete errors, do as much as we can", + deleteErrors: []error{errors.New("a"), nil, errors.New("c")}, + expectedErr: "[a, c]", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + objectStore := new(arktest.ObjectStore) + backupStore := &objectBackupStore{ + objectStore: objectStore, + bucket: "test-bucket", + prefix: test.prefix, + logger: arktest.NewLogger(), + } + defer objectStore.AssertExpectations(t) + + objects := []string{test.prefix + "bak/ark-backup.json", test.prefix + "bak/bak.tar.gz", test.prefix + "bak/bak.log.gz"} + + objectStore.On("ListObjects", backupStore.bucket, test.prefix+"bak/").Return(objects, test.listObjectsError) + for i, obj := range objects { + var err error + if i < len(test.deleteErrors) { + err = test.deleteErrors[i] + } + + objectStore.On("DeleteObject", backupStore.bucket, obj).Return(err) + } + + err := backupStore.DeleteBackup("bak") + + arktest.AssertErrorMatches(t, test.expectedErr, err) + }) + } +} + +func TestGetDownloadURL(t *testing.T) { + tests := []struct { + name string + targetKind api.DownloadTargetKind + targetName string + directory string + prefix string + expectedKey string + }{ + { + name: "backup contents", + targetKind: api.DownloadTargetKindBackupContents, + targetName: "my-backup", + directory: "my-backup", + expectedKey: "my-backup/my-backup.tar.gz", + }, + { + name: "backup log", + targetKind: api.DownloadTargetKindBackupLog, + targetName: "my-backup", + directory: "my-backup", + expectedKey: "my-backup/my-backup-logs.gz", + }, + { + name: "scheduled backup contents", + targetKind: api.DownloadTargetKindBackupContents, + targetName: "my-backup-20170913154901", + directory: "my-backup-20170913154901", + expectedKey: "my-backup-20170913154901/my-backup-20170913154901.tar.gz", + }, + { + name: "scheduled backup log", + targetKind: api.DownloadTargetKindBackupLog, + targetName: "my-backup-20170913154901", + directory: "my-backup-20170913154901", + expectedKey: "my-backup-20170913154901/my-backup-20170913154901-logs.gz", + }, + { + name: "backup contents with backup store prefix", + targetKind: api.DownloadTargetKindBackupContents, + targetName: "my-backup", + directory: "my-backup", + prefix: "ark-backups/", + expectedKey: "ark-backups/my-backup/my-backup.tar.gz", + }, + { + name: "restore log", + targetKind: api.DownloadTargetKindRestoreLog, + targetName: "b-20170913154901", + directory: "b", + expectedKey: "b/restore-b-20170913154901-logs.gz", + }, + { + name: "restore results", + targetKind: api.DownloadTargetKindRestoreResults, + targetName: "b-20170913154901", + directory: "b", + expectedKey: "b/restore-b-20170913154901-results.gz", + }, + { + name: "restore results - backup has multiple dashes (e.g. restore of scheduled backup)", + targetKind: api.DownloadTargetKindRestoreResults, + targetName: "b-cool-20170913154901-20170913154902", + directory: "b-cool-20170913154901", + expectedKey: "b-cool-20170913154901/restore-b-cool-20170913154901-20170913154902-results.gz", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + harness := newObjectBackupStoreTestHarness("test-bucket", test.prefix) + + require.NoError(t, harness.objectStore.PutObject("test-bucket", test.expectedKey, newStringReadSeeker("foo"))) + + url, err := harness.GetDownloadURL(test.directory, api.DownloadTarget{Kind: test.targetKind, Name: test.targetName}) + require.NoError(t, err) + assert.Equal(t, "a-url", url) + }) + } +} + +func encodeToBytes(obj runtime.Object) []byte { + res, err := encode.Encode(obj, "json") + if err != nil { + panic(err) + } + return res +} + +type stringReadSeeker struct { + *strings.Reader +} + +func newStringReadSeeker(s string) *stringReadSeeker { + return &stringReadSeeker{ + Reader: strings.NewReader(s), + } +} + +func (srs *stringReadSeeker) Seek(offset int64, whence int) (int64, error) { + return 0, nil +} + +type errorReader struct{} + +func (r *errorReader) Read([]byte) (int, error) { + return 0, errors.New("error readers return errors") +} diff --git a/pkg/plugin/generated/ObjectStore.pb.go b/pkg/plugin/generated/ObjectStore.pb.go index 38f2e9904..63282cf00 100644 --- a/pkg/plugin/generated/ObjectStore.pb.go +++ b/pkg/plugin/generated/ObjectStore.pb.go @@ -109,6 +109,7 @@ type ListCommonPrefixesRequest struct { Plugin string `protobuf:"bytes,1,opt,name=plugin" json:"plugin,omitempty"` Bucket string `protobuf:"bytes,2,opt,name=bucket" json:"bucket,omitempty"` Delimiter string `protobuf:"bytes,3,opt,name=delimiter" json:"delimiter,omitempty"` + Prefix string `protobuf:"bytes,4,opt,name=prefix" json:"prefix,omitempty"` } func (m *ListCommonPrefixesRequest) Reset() { *m = ListCommonPrefixesRequest{} } @@ -137,6 +138,13 @@ func (m *ListCommonPrefixesRequest) GetDelimiter() string { return "" } +func (m *ListCommonPrefixesRequest) GetPrefix() string { + if m != nil { + return m.Prefix + } + return "" +} + type ListCommonPrefixesResponse struct { Prefixes []string `protobuf:"bytes,1,rep,name=prefixes" json:"prefixes,omitempty"` } @@ -637,34 +645,35 @@ var _ObjectStore_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("ObjectStore.proto", fileDescriptor2) } var fileDescriptor2 = []byte{ - // 459 bytes of a gzipped FileDescriptorProto + // 468 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x51, 0x8b, 0xd3, 0x40, 0x10, 0x26, 0x26, 0x1e, 0x66, 0xae, 0x60, 0x9c, 0x83, 0x1a, 0x73, 0x2a, 0x75, 0x51, 0xa8, 0x08, 0xe5, 0xd0, 0x17, 0x1f, 0x7c, 0x10, 0x4f, 0x11, 0xa1, 0xe0, 0x91, 0x2a, 0xfa, 0xe0, 0x4b, 0x7a, - 0x19, 0x7b, 0xb1, 0x69, 0x12, 0x37, 0x13, 0x30, 0xff, 0xc0, 0x9f, 0x2d, 0xbb, 0x59, 0xeb, 0xa6, - 0xd7, 0xf3, 0xa0, 0xf4, 0x6d, 0xe6, 0xdb, 0xf9, 0x66, 0xbe, 0xec, 0x7e, 0x13, 0xb8, 0xf3, 0x71, - 0xfe, 0x83, 0xce, 0x79, 0xc6, 0xa5, 0xa4, 0x49, 0x25, 0x4b, 0x2e, 0xd1, 0x5f, 0x50, 0x41, 0x32, - 0x61, 0x4a, 0xa3, 0xc1, 0xec, 0x22, 0x91, 0x94, 0x76, 0x07, 0xe2, 0x02, 0x82, 0xb3, 0x86, 0x3b, - 0x42, 0x4c, 0x3f, 0x1b, 0xaa, 0x19, 0x87, 0x70, 0x50, 0xe5, 0xcd, 0x22, 0x2b, 0x42, 0x67, 0xe4, - 0x8c, 0xfd, 0xd8, 0x64, 0x0a, 0x9f, 0x37, 0xe7, 0x4b, 0xe2, 0xf0, 0x46, 0x87, 0x77, 0x19, 0x06, - 0xe0, 0x2e, 0xa9, 0x0d, 0x5d, 0x0d, 0xaa, 0x10, 0x11, 0xbc, 0x79, 0x99, 0xb6, 0xa1, 0x37, 0x72, - 0xc6, 0x83, 0x58, 0xc7, 0xe2, 0x13, 0x04, 0xef, 0x69, 0xdf, 0x93, 0xc4, 0x31, 0xdc, 0x7c, 0xd3, - 0x32, 0xd5, 0x6a, 0x64, 0x9a, 0x70, 0xa2, 0x1b, 0x0d, 0x62, 0x1d, 0x8b, 0x0c, 0xee, 0x4d, 0xb3, - 0x9a, 0x4f, 0xcb, 0xd5, 0xaa, 0x2c, 0xce, 0x24, 0x7d, 0xcf, 0x7e, 0x51, 0xbd, 0xeb, 0xec, 0xfb, - 0xe0, 0xa7, 0x94, 0x67, 0xab, 0x8c, 0x49, 0x1a, 0x05, 0xff, 0x00, 0xf1, 0x12, 0xa2, 0x6d, 0xa3, - 0xea, 0xaa, 0x2c, 0x6a, 0xc2, 0x08, 0x6e, 0x55, 0x06, 0x0b, 0x9d, 0x91, 0x3b, 0xf6, 0xe3, 0x75, - 0x2e, 0xbe, 0x01, 0x2a, 0x66, 0x77, 0x31, 0x3b, 0xab, 0x53, 0xf5, 0xba, 0xa3, 0x91, 0x66, 0x32, - 0xf1, 0x14, 0x8e, 0x7a, 0xdd, 0x8d, 0x20, 0x04, 0x6f, 0x49, 0xed, 0x5f, 0x31, 0x3a, 0x16, 0x5f, - 0xe0, 0xe8, 0x2d, 0xe5, 0xc4, 0xb4, 0xef, 0x37, 0xca, 0x61, 0x78, 0x2a, 0x29, 0x61, 0x9a, 0x65, - 0x8b, 0x82, 0xd2, 0xcf, 0xf1, 0x74, 0x7f, 0x4e, 0x0b, 0xc0, 0x65, 0xce, 0xb5, 0xd1, 0xdc, 0x58, - 0x85, 0xe2, 0x19, 0xdc, 0xbd, 0x34, 0xcd, 0x7c, 0x75, 0x00, 0x6e, 0x23, 0x73, 0x33, 0x4b, 0x85, - 0xcf, 0x7f, 0x7b, 0x70, 0x68, 0x6d, 0x0b, 0x9e, 0x80, 0xf7, 0xa1, 0xc8, 0x18, 0x87, 0x93, 0xf5, - 0xc2, 0x4c, 0x14, 0x60, 0x04, 0x47, 0x81, 0x85, 0xbf, 0x5b, 0x55, 0xdc, 0xe2, 0x2b, 0xf0, 0xd7, - 0x0b, 0x84, 0xc7, 0xd6, 0xf1, 0xe6, 0x5a, 0x5d, 0xe6, 0x8e, 0x1d, 0xc5, 0x5e, 0x2f, 0x45, 0x8f, - 0xbd, 0xb9, 0x2a, 0x3d, 0xb6, 0x76, 0xfc, 0x89, 0x83, 0x49, 0x67, 0x9d, 0xbe, 0xe9, 0xf0, 0xb1, - 0x55, 0x79, 0xa5, 0xfd, 0xa3, 0x27, 0xd7, 0x54, 0x99, 0x2b, 0x9b, 0xc2, 0xa1, 0xe5, 0x1f, 0x7c, - 0xb0, 0xc1, 0xea, 0xbb, 0x36, 0x7a, 0x78, 0xd5, 0xb1, 0xe9, 0xf6, 0x1a, 0x06, 0xb6, 0xc5, 0xd0, - 0xae, 0xdf, 0xe2, 0xbd, 0x2d, 0xd7, 0xfd, 0x15, 0x6e, 0x6f, 0xbc, 0x2e, 0x3e, 0xb2, 0x8a, 0xb6, - 0xfb, 0x2c, 0x12, 0xff, 0x2b, 0xe9, 0xb4, 0xcd, 0x0f, 0xf4, 0x0f, 0xf1, 0xc5, 0x9f, 0x00, 0x00, - 0x00, 0xff, 0xff, 0x59, 0xaf, 0x2a, 0xa0, 0x3e, 0x05, 0x00, 0x00, + 0x19, 0x7b, 0xb1, 0x69, 0x12, 0x37, 0x13, 0x30, 0x8f, 0xbe, 0xf9, 0xb3, 0x65, 0x37, 0x6b, 0x6f, + 0xd3, 0xeb, 0x79, 0x70, 0xf4, 0x6d, 0x66, 0x76, 0xbe, 0x99, 0x2f, 0xbb, 0xdf, 0x17, 0xb8, 0xf3, + 0x71, 0xfe, 0x83, 0x4e, 0x79, 0xc6, 0xa5, 0xa4, 0x49, 0x25, 0x4b, 0x2e, 0xd1, 0x5f, 0x50, 0x41, + 0x32, 0x61, 0x4a, 0xa3, 0xc1, 0xec, 0x2c, 0x91, 0x94, 0x76, 0x07, 0xe2, 0x0c, 0x82, 0x93, 0x86, + 0x3b, 0x40, 0x4c, 0x3f, 0x1b, 0xaa, 0x19, 0x87, 0xb0, 0x57, 0xe5, 0xcd, 0x22, 0x2b, 0x42, 0x67, + 0xe4, 0x8c, 0xfd, 0xd8, 0x64, 0xaa, 0x3e, 0x6f, 0x4e, 0x97, 0xc4, 0xe1, 0x8d, 0xae, 0xde, 0x65, + 0x18, 0x80, 0xbb, 0xa4, 0x36, 0x74, 0x75, 0x51, 0x85, 0x88, 0xe0, 0xcd, 0xcb, 0xb4, 0x0d, 0xbd, + 0x91, 0x33, 0x1e, 0xc4, 0x3a, 0x16, 0x9f, 0x20, 0x78, 0x4f, 0xbb, 0xde, 0x24, 0x0e, 0xe1, 0xe6, + 0x9b, 0x96, 0xa9, 0x56, 0x2b, 0xd3, 0x84, 0x13, 0x3d, 0x68, 0x10, 0xeb, 0x58, 0xfc, 0x76, 0xe0, + 0xde, 0x34, 0xab, 0xf9, 0xb8, 0x5c, 0xad, 0xca, 0xe2, 0x44, 0xd2, 0xf7, 0xec, 0x17, 0xd5, 0xd7, + 0x5d, 0x7e, 0x1f, 0xfc, 0x94, 0xf2, 0x6c, 0x95, 0x31, 0x49, 0x43, 0xe1, 0xbc, 0xa0, 0xa7, 0xe9, + 0x05, 0xfa, 0xa3, 0xd5, 0x34, 0x9d, 0x89, 0x97, 0x10, 0x6d, 0xa3, 0x50, 0x57, 0x65, 0x51, 0x13, + 0x46, 0x70, 0xab, 0x32, 0xb5, 0xd0, 0x19, 0xb9, 0x63, 0x3f, 0x5e, 0xe7, 0xe2, 0x1b, 0xa0, 0x42, + 0x76, 0x37, 0x76, 0x6d, 0xd6, 0xe7, 0xbc, 0xdc, 0x1e, 0xaf, 0xa7, 0x70, 0xd0, 0x9b, 0x6e, 0x08, + 0x21, 0x78, 0x4b, 0x6a, 0xff, 0x91, 0xd1, 0xb1, 0xf8, 0x02, 0x07, 0x6f, 0x29, 0x27, 0xa6, 0x5d, + 0x3f, 0x5e, 0x0e, 0xc3, 0x63, 0x49, 0x09, 0xd3, 0x2c, 0x5b, 0x14, 0x94, 0x7e, 0x8e, 0xa7, 0xbb, + 0x93, 0x60, 0x00, 0x2e, 0x73, 0xae, 0x1f, 0xc3, 0x8d, 0x55, 0x28, 0x9e, 0xc1, 0xdd, 0x0b, 0xdb, + 0xcc, 0x57, 0x07, 0xe0, 0x36, 0x32, 0x37, 0xbb, 0x54, 0xf8, 0xfc, 0x8f, 0x07, 0xfb, 0x96, 0x8d, + 0xf0, 0x08, 0xbc, 0x0f, 0x45, 0xc6, 0x38, 0x9c, 0xac, 0x9d, 0x34, 0x51, 0x05, 0x43, 0x38, 0x0a, + 0xac, 0xfa, 0xbb, 0x55, 0xc5, 0x2d, 0xbe, 0x02, 0x7f, 0xed, 0x2c, 0x3c, 0xb4, 0x8e, 0x37, 0xfd, + 0x76, 0x11, 0x3b, 0x76, 0x14, 0x7a, 0xed, 0x96, 0x1e, 0x7a, 0xd3, 0x43, 0x3d, 0xb4, 0xb6, 0xc2, + 0x91, 0x83, 0x49, 0x27, 0x9d, 0xbe, 0xe8, 0xf0, 0xb1, 0xd5, 0x79, 0xa9, 0x2d, 0xa2, 0x27, 0x57, + 0x74, 0x99, 0x2b, 0x9b, 0xc2, 0xbe, 0xa5, 0x1f, 0x7c, 0xb0, 0x81, 0xea, 0xab, 0x36, 0x7a, 0x78, + 0xd9, 0xb1, 0x99, 0xf6, 0x1a, 0x06, 0xb6, 0xc4, 0xd0, 0xee, 0xdf, 0xa2, 0xbd, 0x2d, 0xd7, 0xfd, + 0x15, 0x6e, 0x6f, 0xbc, 0x2e, 0x3e, 0xb2, 0x9a, 0xb6, 0xeb, 0x2c, 0x12, 0xff, 0x6b, 0xe9, 0xb8, + 0xcd, 0xf7, 0xf4, 0x9f, 0xf2, 0xc5, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x54, 0xac, 0xfe, 0xa7, + 0x57, 0x05, 0x00, 0x00, } diff --git a/pkg/plugin/object_store.go b/pkg/plugin/object_store.go index 918fac2ef..aac5f6a91 100644 --- a/pkg/plugin/object_store.go +++ b/pkg/plugin/object_store.go @@ -132,10 +132,17 @@ func (c *ObjectStoreGRPCClient) GetObject(bucket, key string) (io.ReadCloser, er } // ListCommonPrefixes gets a list of all object key prefixes that come -// before the provided delimiter (this is often used to simulate a directory -// hierarchy in object storage). -func (c *ObjectStoreGRPCClient) ListCommonPrefixes(bucket, delimiter string) ([]string, error) { - res, err := c.grpcClient.ListCommonPrefixes(context.Background(), &proto.ListCommonPrefixesRequest{Plugin: c.plugin, Bucket: bucket, Delimiter: delimiter}) +// after the provided prefix and before the provided delimiter (this is +// often used to simulate a directory hierarchy in object storage). +func (c *ObjectStoreGRPCClient) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) { + req := &proto.ListCommonPrefixesRequest{ + Plugin: c.plugin, + Bucket: bucket, + Prefix: prefix, + Delimiter: delimiter, + } + + res, err := c.grpcClient.ListCommonPrefixes(context.Background(), req) if err != nil { return nil, err } @@ -294,16 +301,16 @@ func (s *ObjectStoreGRPCServer) GetObject(req *proto.GetObjectRequest, stream pr } } -// ListCommonPrefixes gets a list of all object key prefixes that come -// before the provided delimiter (this is often used to simulate a directory -// hierarchy in object storage). +// ListCommonPrefixes gets a list of all object key prefixes that start with +// the specified prefix and stop at the next instance of the provided delimiter +// (this is often used to simulate a directory hierarchy in object storage). func (s *ObjectStoreGRPCServer) ListCommonPrefixes(ctx context.Context, req *proto.ListCommonPrefixesRequest) (*proto.ListCommonPrefixesResponse, error) { impl, err := s.getImpl(req.Plugin) if err != nil { return nil, err } - prefixes, err := impl.ListCommonPrefixes(req.Bucket, req.Delimiter) + prefixes, err := impl.ListCommonPrefixes(req.Bucket, req.Prefix, req.Delimiter) if err != nil { return nil, err } diff --git a/pkg/plugin/proto/ObjectStore.proto b/pkg/plugin/proto/ObjectStore.proto index ef5461fb7..7a2d43a40 100644 --- a/pkg/plugin/proto/ObjectStore.proto +++ b/pkg/plugin/proto/ObjectStore.proto @@ -24,6 +24,7 @@ message ListCommonPrefixesRequest { string plugin = 1; string bucket = 2; string delimiter = 3; + string prefix = 4; } message ListCommonPrefixesResponse { diff --git a/pkg/plugin/restartable_object_store.go b/pkg/plugin/restartable_object_store.go index 34b8388f4..dc691a134 100644 --- a/pkg/plugin/restartable_object_store.go +++ b/pkg/plugin/restartable_object_store.go @@ -127,12 +127,12 @@ func (r *restartableObjectStore) GetObject(bucket string, key string) (io.ReadCl } // ListCommonPrefixes restarts the plugin's process if needed, then delegates the call. -func (r *restartableObjectStore) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) { +func (r *restartableObjectStore) ListCommonPrefixes(bucket string, prefix string, delimiter string) ([]string, error) { delegate, err := r.getDelegate() if err != nil { return nil, err } - return delegate.ListCommonPrefixes(bucket, delimiter) + return delegate.ListCommonPrefixes(bucket, prefix, delimiter) } // ListObjects restarts the plugin's process if needed, then delegates the call. diff --git a/pkg/plugin/restartable_object_store_test.go b/pkg/plugin/restartable_object_store_test.go index 76811a905..206866063 100644 --- a/pkg/plugin/restartable_object_store_test.go +++ b/pkg/plugin/restartable_object_store_test.go @@ -208,7 +208,7 @@ func TestRestartableObjectStoreDelegatedFunctions(t *testing.T) { }, restartableDelegateTest{ function: "ListCommonPrefixes", - inputs: []interface{}{"bucket", "delimeter"}, + inputs: []interface{}{"bucket", "prefix", "delimiter"}, expectedErrorOutputs: []interface{}{([]string)(nil), errors.Errorf("reset error")}, expectedDelegateOutputs: []interface{}{[]string{"a", "b"}, errors.Errorf("delegate error")}, }, diff --git a/pkg/util/test/comparisons.go b/pkg/util/test/comparisons.go index 5e15e0b09..d6daf9cf3 100644 --- a/pkg/util/test/comparisons.go +++ b/pkg/util/test/comparisons.go @@ -110,3 +110,13 @@ func AssertDeepEqual(t *testing.T, expected, actual interface{}) bool { return true } + +// AssertErrorMatches asserts that if expected is the empty string, actual +// is nil, otherwise, that actual's error string matches expected. +func AssertErrorMatches(t *testing.T, expected string, actual error) bool { + if expected != "" { + return assert.EqualError(t, actual, expected) + } + + return assert.NoError(t, actual) +} diff --git a/pkg/util/test/object_store.go b/pkg/util/test/object_store.go index b8c8ccfb9..3a7f9104e 100644 --- a/pkg/util/test/object_store.go +++ b/pkg/util/test/object_store.go @@ -98,13 +98,13 @@ func (_m *ObjectStore) Init(config map[string]string) error { return r0 } -// ListCommonPrefixes provides a mock function with given fields: bucket, delimiter -func (_m *ObjectStore) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) { - ret := _m.Called(bucket, delimiter) +// ListCommonPrefixes provides a mock function with given fields: bucket, prefix, delimiter +func (_m *ObjectStore) ListCommonPrefixes(bucket string, prefix string, delimiter string) ([]string, error) { + ret := _m.Called(bucket, prefix, delimiter) var r0 []string - if rf, ok := ret.Get(0).(func(string, string) []string); ok { - r0 = rf(bucket, delimiter) + if rf, ok := ret.Get(0).(func(string, string, string) []string); ok { + r0 = rf(bucket, prefix, delimiter) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]string) @@ -112,8 +112,8 @@ func (_m *ObjectStore) ListCommonPrefixes(bucket string, delimiter string) ([]st } var r1 error - if rf, ok := ret.Get(1).(func(string, string) error); ok { - r1 = rf(bucket, delimiter) + if rf, ok := ret.Get(1).(func(string, string, string) error); ok { + r1 = rf(bucket, prefix, delimiter) } else { r1 = ret.Error(1) }