add a BackupStore to pkg/persistence that supports prefixes

Signed-off-by: Steve Kriss <steve@heptio.com>
pull/800/head
Steve Kriss 2018-08-20 16:29:54 -07:00
parent af64069d65
commit f0edf7335f
28 changed files with 1391 additions and 1068 deletions

View File

@ -116,7 +116,7 @@ func (o *objectStore) Init(config map[string]string) error {
return nil return nil
} }
func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error { func (o *objectStore) PutObject(bucket, key string, body io.Reader) error {
req := &s3manager.UploadInput{ req := &s3manager.UploadInput{
Bucket: &bucket, Bucket: &bucket,
Key: &key, Key: &key,
@ -134,7 +134,7 @@ func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error
return errors.Wrapf(err, "error putting object %s", key) return errors.Wrapf(err, "error putting object %s", key)
} }
func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error) { func (o *objectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
req := &s3.GetObjectInput{ req := &s3.GetObjectInput{
Bucket: &bucket, Bucket: &bucket,
Key: &key, Key: &key,
@ -148,9 +148,10 @@ func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error
return res.Body, nil return res.Body, nil
} }
func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) { func (o *objectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) {
req := &s3.ListObjectsV2Input{ req := &s3.ListObjectsV2Input{
Bucket: &bucket, Bucket: &bucket,
Prefix: &prefix,
Delimiter: &delimiter, Delimiter: &delimiter,
} }
@ -161,7 +162,6 @@ func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]str
} }
return !lastPage return !lastPage
}) })
if err != nil { if err != nil {
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
@ -190,7 +190,7 @@ func (o *objectStore) ListObjects(bucket, prefix string) ([]string, error) {
return ret, nil return ret, nil
} }
func (o *objectStore) DeleteObject(bucket string, key string) error { func (o *objectStore) DeleteObject(bucket, key string) error {
req := &s3.DeleteObjectInput{ req := &s3.DeleteObjectInput{
Bucket: &bucket, Bucket: &bucket,
Key: &key, Key: &key,

View File

@ -119,7 +119,7 @@ func (o *objectStore) Init(config map[string]string) error {
return nil return nil
} }
func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error { func (o *objectStore) PutObject(bucket, key string, body io.Reader) error {
container, err := getContainerReference(o.blobClient, bucket) container, err := getContainerReference(o.blobClient, bucket)
if err != nil { if err != nil {
return err return err
@ -133,7 +133,7 @@ func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error
return errors.WithStack(blob.CreateBlockBlobFromReader(body, nil)) return errors.WithStack(blob.CreateBlockBlobFromReader(body, nil))
} }
func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error) { func (o *objectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
container, err := getContainerReference(o.blobClient, bucket) container, err := getContainerReference(o.blobClient, bucket)
if err != nil { if err != nil {
return nil, err return nil, err
@ -152,13 +152,14 @@ func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error
return res, nil return res, nil
} }
func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) { func (o *objectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) {
container, err := getContainerReference(o.blobClient, bucket) container, err := getContainerReference(o.blobClient, bucket)
if err != nil { if err != nil {
return nil, err return nil, err
} }
params := storage.ListBlobsParameters{ params := storage.ListBlobsParameters{
Prefix: prefix,
Delimiter: delimiter, Delimiter: delimiter,
} }
@ -167,14 +168,7 @@ func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]str
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
// Azure returns prefixes inclusive of the last delimiter. We need to strip return res.BlobPrefixes, nil
// it.
ret := make([]string, 0, len(res.BlobPrefixes))
for _, prefix := range res.BlobPrefixes {
ret = append(ret, prefix[0:strings.LastIndex(prefix, delimiter)])
}
return ret, nil
} }
func (o *objectStore) ListObjects(bucket, prefix string) ([]string, error) { func (o *objectStore) ListObjects(bucket, prefix string) ([]string, error) {

View File

@ -21,7 +21,6 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"strings"
"time" "time"
"cloud.google.com/go/storage" "cloud.google.com/go/storage"
@ -98,7 +97,7 @@ func (o *objectStore) Init(config map[string]string) error {
return nil return nil
} }
func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error { func (o *objectStore) PutObject(bucket, key string, body io.Reader) error {
w := o.bucketWriter.getWriteCloser(bucket, key) w := o.bucketWriter.getWriteCloser(bucket, key)
// The writer returned by NewWriter is asynchronous, so errors aren't guaranteed // The writer returned by NewWriter is asynchronous, so errors aren't guaranteed
@ -114,7 +113,7 @@ func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error
return closeErr return closeErr
} }
func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error) { func (o *objectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
r, err := o.client.Bucket(bucket).Object(key).NewReader(context.Background()) r, err := o.client.Bucket(bucket).Object(key).NewReader(context.Background())
if err != nil { if err != nil {
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
@ -123,28 +122,30 @@ func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error
return r, nil return r, nil
} }
func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) { func (o *objectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) {
q := &storage.Query{ q := &storage.Query{
Prefix: prefix,
Delimiter: delimiter, Delimiter: delimiter,
} }
var res []string
iter := o.client.Bucket(bucket).Objects(context.Background(), q) iter := o.client.Bucket(bucket).Objects(context.Background(), q)
var res []string
for { for {
obj, err := iter.Next() obj, err := iter.Next()
if err == iterator.Done { if err != nil && err != iterator.Done {
return res, nil
}
if err != nil {
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
if err == iterator.Done {
break
}
if obj.Prefix != "" { if obj.Prefix != "" {
res = append(res, obj.Prefix[0:strings.LastIndex(obj.Prefix, delimiter)]) res = append(res, obj.Prefix)
} }
} }
return res, nil
} }
func (o *objectStore) ListObjects(bucket, prefix string) ([]string, error) { func (o *objectStore) ListObjects(bucket, prefix string) ([]string, error) {
@ -169,7 +170,7 @@ func (o *objectStore) ListObjects(bucket, prefix string) ([]string, error) {
} }
} }
func (o *objectStore) DeleteObject(bucket string, key string) error { func (o *objectStore) DeleteObject(bucket, key string) error {
return errors.Wrapf(o.client.Bucket(bucket).Object(key).Delete(context.Background()), "error deleting object %s", key) return errors.Wrapf(o.client.Bucket(bucket).Object(key).Delete(context.Background()), "error deleting object %s", key)
} }

View File

@ -0,0 +1,168 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudprovider
import (
"bytes"
"errors"
"io"
"io/ioutil"
"strings"
"time"
)
type BucketData map[string][]byte
// InMemoryObjectStore is a simple implementation of the ObjectStore interface
// that stores its data in-memory/in-proc. This is mainly intended to be used
// as a test fake.
type InMemoryObjectStore struct {
Data map[string]BucketData
}
func NewInMemoryObjectStore(buckets ...string) *InMemoryObjectStore {
o := &InMemoryObjectStore{
Data: make(map[string]BucketData),
}
for _, bucket := range buckets {
o.Data[bucket] = make(map[string][]byte)
}
return o
}
//
// Interface Implementation
//
func (o *InMemoryObjectStore) Init(config map[string]string) error {
return nil
}
func (o *InMemoryObjectStore) PutObject(bucket, key string, body io.Reader) error {
bucketData, ok := o.Data[bucket]
if !ok {
return errors.New("bucket not found")
}
obj, err := ioutil.ReadAll(body)
if err != nil {
return err
}
bucketData[key] = obj
return nil
}
func (o *InMemoryObjectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
bucketData, ok := o.Data[bucket]
if !ok {
return nil, errors.New("bucket not found")
}
obj, ok := bucketData[key]
if !ok {
return nil, errors.New("key not found")
}
return ioutil.NopCloser(bytes.NewReader(obj)), nil
}
func (o *InMemoryObjectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) {
keys, err := o.ListObjects(bucket, prefix)
if err != nil {
return nil, err
}
// For each key, check if it has an instance of the delimiter *after* the prefix.
// If not, skip it; if so, return the prefix of the key up to/including the delimiter.
var prefixes []string
for _, key := range keys {
// everything after 'prefix'
afterPrefix := key[len(prefix):]
// index of the *start* of 'delimiter' in 'afterPrefix'
delimiterStart := strings.Index(afterPrefix, delimiter)
if delimiterStart == -1 {
continue
}
// return the prefix, plus everything after the prefix and before
// the delimiter, plus the delimiter
fullPrefix := prefix + afterPrefix[0:delimiterStart] + delimiter
prefixes = append(prefixes, fullPrefix)
}
return prefixes, nil
}
func (o *InMemoryObjectStore) ListObjects(bucket, prefix string) ([]string, error) {
bucketData, ok := o.Data[bucket]
if !ok {
return nil, errors.New("bucket not found")
}
var objs []string
for key := range bucketData {
if strings.HasPrefix(key, prefix) {
objs = append(objs, key)
}
}
return objs, nil
}
func (o *InMemoryObjectStore) DeleteObject(bucket, key string) error {
bucketData, ok := o.Data[bucket]
if !ok {
return errors.New("bucket not found")
}
delete(bucketData, key)
return nil
}
func (o *InMemoryObjectStore) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) {
bucketData, ok := o.Data[bucket]
if !ok {
return "", errors.New("bucket not found")
}
_, ok = bucketData[key]
if !ok {
return "", errors.New("key not found")
}
return "a-url", nil
}
//
// Test Helper Methods
//
func (o *InMemoryObjectStore) ClearBucket(bucket string) {
if _, ok := o.Data[bucket]; !ok {
return
}
o.Data[bucket] = make(map[string][]byte)
}

View File

@ -1,48 +0,0 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import mock "github.com/stretchr/testify/mock"
import v1 "github.com/heptio/ark/pkg/apis/ark/v1"
// BackupLister is an autogenerated mock type for the BackupLister type
type BackupLister struct {
mock.Mock
}
// ListBackups provides a mock function with given fields: bucket
func (_m *BackupLister) ListBackups(bucket string) ([]*v1.Backup, error) {
ret := _m.Called(bucket)
var r0 []*v1.Backup
if rf, ok := ret.Get(0).(func(string) []*v1.Backup); ok {
r0 = rf(bucket)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*v1.Backup)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(bucket)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View File

@ -31,17 +31,23 @@ type ObjectStore interface {
// PutObject creates a new object using the data in body within the specified // PutObject creates a new object using the data in body within the specified
// object storage bucket with the given key. // object storage bucket with the given key.
PutObject(bucket string, key string, body io.Reader) error PutObject(bucket, key string, body io.Reader) error
// GetObject retrieves the object with the given key from the specified // GetObject retrieves the object with the given key from the specified
// bucket in object storage. // bucket in object storage.
GetObject(bucket string, key string) (io.ReadCloser, error) GetObject(bucket, key string) (io.ReadCloser, error)
// ListCommonPrefixes gets a list of all object key prefixes that come // ListCommonPrefixes gets a list of all object key prefixes that start with
// before the provided delimiter. For example, if the bucket contains // the specified prefix and stop at the next instance of the provided delimiter.
// the keys "foo-1/bar", "foo-1/baz", and "foo-2/baz", and the delimiter //
// is "/", this will return the slice {"foo-1", "foo-2"}. // For example, if the bucket contains the following keys:
ListCommonPrefixes(bucket string, delimiter string) ([]string, error) // a-prefix/foo-1/bar
// a-prefix/foo-1/baz
// a-prefix/foo-2/baz
// some-other-prefix/foo-3/bar
// and the provided prefix arg is "a-prefix/", and the delimiter is "/",
// this will return the slice {"a-prefix/foo-1/", "a-prefix/foo-2/"}.
ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error)
// ListObjects gets a list of all keys in the specified bucket // ListObjects gets a list of all keys in the specified bucket
// that have the given prefix. // that have the given prefix.
@ -49,7 +55,7 @@ type ObjectStore interface {
// DeleteObject removes the object with the specified key from the given // DeleteObject removes the object with the specified key from the given
// bucket. // bucket.
DeleteObject(bucket string, key string) error DeleteObject(bucket, key string) error
// CreateSignedURL creates a pre-signed URL for the given bucket and key that expires after ttl. // CreateSignedURL creates a pre-signed URL for the given bucket and key that expires after ttl.
CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error)

View File

@ -42,7 +42,6 @@ import (
api "github.com/heptio/ark/pkg/apis/ark/v1" api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/backup" "github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/cloudprovider"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
@ -74,6 +73,7 @@ type backupController struct {
backupLocationListerSynced cache.InformerSynced backupLocationListerSynced cache.InformerSynced
defaultBackupLocation string defaultBackupLocation string
metrics *metrics.ServerMetrics metrics *metrics.ServerMetrics
newBackupStore func(*api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error)
} }
func NewBackupController( func NewBackupController(
@ -105,6 +105,8 @@ func NewBackupController(
backupLocationListerSynced: backupLocationInformer.Informer().HasSynced, backupLocationListerSynced: backupLocationInformer.Informer().HasSynced,
defaultBackupLocation: defaultBackupLocation, defaultBackupLocation: defaultBackupLocation,
metrics: metrics, metrics: metrics,
newBackupStore: persistence.NewObjectBackupStore,
} }
c.syncHandler = c.processBackup c.syncHandler = c.processBackup
@ -382,21 +384,21 @@ func (controller *backupController) runBackup(backup *api.Backup, backupLocation
log.Info("Starting backup") log.Info("Starting backup")
pluginManager := controller.newPluginManager(log)
defer pluginManager.CleanupClients()
backupFile, err := ioutil.TempFile("", "") backupFile, err := ioutil.TempFile("", "")
if err != nil { if err != nil {
return errors.Wrap(err, "error creating temp file for backup") return errors.Wrap(err, "error creating temp file for backup")
} }
defer closeAndRemoveFile(backupFile, log) defer closeAndRemoveFile(backupFile, log)
pluginManager := controller.newPluginManager(log)
defer pluginManager.CleanupClients()
actions, err := pluginManager.GetBackupItemActions() actions, err := pluginManager.GetBackupItemActions()
if err != nil { if err != nil {
return err return err
} }
objectStore, err := getObjectStoreForLocation(backupLocation, pluginManager) backupStore, err := controller.newBackupStore(backupLocation, pluginManager, log)
if err != nil { if err != nil {
return err return err
} }
@ -438,7 +440,7 @@ func (controller *backupController) runBackup(backup *api.Backup, backupLocation
controller.logger.WithError(err).Error("error closing gzippedLogFile") controller.logger.WithError(err).Error("error closing gzippedLogFile")
} }
if err := persistence.UploadBackup(log, objectStore, backupLocation.Spec.ObjectStorage.Bucket, backup.Name, backupJSONToUpload, backupFileToUpload, logFile); err != nil { if err := backupStore.PutBackup(backup.Name, backupJSONToUpload, backupFileToUpload, logFile); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
@ -454,34 +456,6 @@ func (controller *backupController) runBackup(backup *api.Backup, backupLocation
return kerrors.NewAggregate(errs) return kerrors.NewAggregate(errs)
} }
// TODO(ncdc): move this to a better location that isn't backup specific
func getObjectStoreForLocation(location *api.BackupStorageLocation, manager plugin.Manager) (cloudprovider.ObjectStore, error) {
if location.Spec.Provider == "" {
return nil, errors.New("backup storage location provider name must not be empty")
}
objectStore, err := manager.GetObjectStore(location.Spec.Provider)
if err != nil {
return nil, err
}
// add the bucket name to the config map so that object stores can use
// it when initializing. The AWS object store uses this to determine the
// bucket's region when setting up its client.
if location.Spec.ObjectStorage != nil {
if location.Spec.Config == nil {
location.Spec.Config = make(map[string]string)
}
location.Spec.Config["bucket"] = location.Spec.ObjectStorage.Bucket
}
if err := objectStore.Init(location.Spec.Config); err != nil {
return nil, err
}
return objectStore, nil
}
func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) { func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) {
if err := file.Close(); err != nil { if err := file.Close(); err != nil {
log.WithError(err).WithField("file", file.Name()).Error("error closing file") log.WithError(err).WithField("file", file.Name()).Error("error closing file")

View File

@ -19,26 +19,27 @@ package controller
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/backup" "github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake" "github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions" informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/metrics" "github.com/heptio/ark/pkg/metrics"
"github.com/heptio/ark/pkg/persistence"
persistencemocks "github.com/heptio/ark/pkg/persistence/mocks"
"github.com/heptio/ark/pkg/plugin" "github.com/heptio/ark/pkg/plugin"
pluginmocks "github.com/heptio/ark/pkg/plugin/mocks" pluginmocks "github.com/heptio/ark/pkg/plugin/mocks"
"github.com/heptio/ark/pkg/util/collections" "github.com/heptio/ark/pkg/util/collections"
@ -179,12 +180,12 @@ func TestProcessBackup(t *testing.T) {
sharedInformers = informers.NewSharedInformerFactory(client, 0) sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = logging.DefaultLogger(logrus.DebugLevel) logger = logging.DefaultLogger(logrus.DebugLevel)
clockTime, _ = time.Parse("Mon Jan 2 15:04:05 2006", "Mon Jan 2 15:04:05 2006") clockTime, _ = time.Parse("Mon Jan 2 15:04:05 2006", "Mon Jan 2 15:04:05 2006")
objectStore = &arktest.ObjectStore{}
pluginManager = &pluginmocks.Manager{} pluginManager = &pluginmocks.Manager{}
backupStore = &persistencemocks.BackupStore{}
) )
defer backupper.AssertExpectations(t) defer backupper.AssertExpectations(t)
defer objectStore.AssertExpectations(t)
defer pluginManager.AssertExpectations(t) defer pluginManager.AssertExpectations(t)
defer backupStore.AssertExpectations(t)
c := NewBackupController( c := NewBackupController(
sharedInformers.Ark().V1().Backups(), sharedInformers.Ark().V1().Backups(),
@ -202,6 +203,10 @@ func TestProcessBackup(t *testing.T) {
c.clock = clock.NewFakeClock(clockTime) c.clock = clock.NewFakeClock(clockTime)
c.newBackupStore = func(*v1.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) {
return backupStore, nil
}
var expiration, startTime time.Time var expiration, startTime time.Time
if test.backup != nil { if test.backup != nil {
@ -217,9 +222,6 @@ func TestProcessBackup(t *testing.T) {
} }
if test.expectBackup { if test.expectBackup {
pluginManager.On("GetObjectStore", "myCloud").Return(objectStore, nil)
objectStore.On("Init", mock.Anything).Return(nil)
// set up a Backup object to represent what we expect to be passed to backupper.Backup() // set up a Backup object to represent what we expect to be passed to backupper.Backup()
backup := test.backup.DeepCopy() backup := test.backup.DeepCopy()
backup.Spec.IncludedResources = test.expectedIncludes backup.Spec.IncludedResources = test.expectedIncludes
@ -278,11 +280,8 @@ func TestProcessBackup(t *testing.T) {
return strings.Contains(json, timeString) return strings.Contains(json, timeString)
} }
objectStore.On("PutObject", "bucket", fmt.Sprintf("%s/%s-logs.gz", test.backup.Name, test.backup.Name), mock.Anything).Return(nil) backupStore.On("PutBackup", test.backup.Name, mock.MatchedBy(completionTimestampIsPresent), mock.Anything, mock.Anything).Return(nil)
objectStore.On("PutObject", "bucket", fmt.Sprintf("%s/ark-backup.json", test.backup.Name), mock.MatchedBy(completionTimestampIsPresent)).Return(nil) pluginManager.On("CleanupClients").Return()
objectStore.On("PutObject", "bucket", fmt.Sprintf("%s/%s.tar.gz", test.backup.Name, test.backup.Name), mock.Anything).Return(nil)
pluginManager.On("CleanupClients")
} }
// this is necessary so the Patch() call returns the appropriate object // this is necessary so the Patch() call returns the appropriate object

View File

@ -58,10 +58,10 @@ type backupDeletionController struct {
resticMgr restic.RepositoryManager resticMgr restic.RepositoryManager
podvolumeBackupLister listers.PodVolumeBackupLister podvolumeBackupLister listers.PodVolumeBackupLister
backupLocationLister listers.BackupStorageLocationLister backupLocationLister listers.BackupStorageLocationLister
deleteBackupDir persistence.DeleteBackupDirFunc
processRequestFunc func(*v1.DeleteBackupRequest) error processRequestFunc func(*v1.DeleteBackupRequest) error
clock clock.Clock clock clock.Clock
newPluginManager func(logrus.FieldLogger) plugin.Manager newPluginManager func(logrus.FieldLogger) plugin.Manager
newBackupStore func(*v1.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error)
} }
// NewBackupDeletionController creates a new backup deletion controller. // NewBackupDeletionController creates a new backup deletion controller.
@ -95,7 +95,7 @@ func NewBackupDeletionController(
// use variables to refer to these functions so they can be // use variables to refer to these functions so they can be
// replaced with fakes for testing. // replaced with fakes for testing.
newPluginManager: newPluginManager, newPluginManager: newPluginManager,
deleteBackupDir: persistence.DeleteBackupDir, newBackupStore: persistence.NewObjectBackupStore,
clock: &clock.RealClock{}, clock: &clock.RealClock{},
} }
@ -322,12 +322,12 @@ func (c *backupDeletionController) deleteBackupFromStorage(backup *v1.Backup, lo
return errors.WithStack(err) return errors.WithStack(err)
} }
objectStore, err := getObjectStoreForLocation(backupLocation, pluginManager) backupStore, err := c.newBackupStore(backupLocation, pluginManager, log)
if err != nil { if err != nil {
return err return err
} }
if err := c.deleteBackupDir(log, objectStore, backupLocation.Spec.ObjectStorage.Bucket, backup.Name); err != nil { if err := backupStore.DeleteBackup(backup.Name); err != nil {
return errors.Wrap(err, "error deleting backup from backup storage") return errors.Wrap(err, "error deleting backup from backup storage")
} }

View File

@ -21,25 +21,27 @@ import (
"testing" "testing"
"time" "time"
"github.com/heptio/ark/pkg/apis/ark/v1"
pkgbackup "github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/plugin"
pluginmocks "github.com/heptio/ark/pkg/plugin/mocks"
arktest "github.com/heptio/ark/pkg/util/test"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"github.com/heptio/ark/pkg/apis/ark/v1"
pkgbackup "github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/persistence"
persistencemocks "github.com/heptio/ark/pkg/persistence/mocks"
"github.com/heptio/ark/pkg/plugin"
pluginmocks "github.com/heptio/ark/pkg/plugin/mocks"
arktest "github.com/heptio/ark/pkg/util/test"
) )
func TestBackupDeletionControllerProcessQueueItem(t *testing.T) { func TestBackupDeletionControllerProcessQueueItem(t *testing.T) {
@ -112,7 +114,7 @@ type backupDeletionControllerTestData struct {
client *fake.Clientset client *fake.Clientset
sharedInformers informers.SharedInformerFactory sharedInformers informers.SharedInformerFactory
blockStore *arktest.FakeBlockStore blockStore *arktest.FakeBlockStore
objectStore *arktest.ObjectStore backupStore *persistencemocks.BackupStore
controller *backupDeletionController controller *backupDeletionController
req *v1.DeleteBackupRequest req *v1.DeleteBackupRequest
} }
@ -123,7 +125,7 @@ func setupBackupDeletionControllerTest(objects ...runtime.Object) *backupDeletio
sharedInformers = informers.NewSharedInformerFactory(client, 0) sharedInformers = informers.NewSharedInformerFactory(client, 0)
blockStore = &arktest.FakeBlockStore{SnapshotsTaken: sets.NewString()} blockStore = &arktest.FakeBlockStore{SnapshotsTaken: sets.NewString()}
pluginManager = &pluginmocks.Manager{} pluginManager = &pluginmocks.Manager{}
objectStore = &arktest.ObjectStore{} backupStore = &persistencemocks.BackupStore{}
req = pkgbackup.NewDeleteBackupRequest("foo", "uid") req = pkgbackup.NewDeleteBackupRequest("foo", "uid")
) )
@ -131,7 +133,7 @@ func setupBackupDeletionControllerTest(objects ...runtime.Object) *backupDeletio
client: client, client: client,
sharedInformers: sharedInformers, sharedInformers: sharedInformers,
blockStore: blockStore, blockStore: blockStore,
objectStore: objectStore, backupStore: backupStore,
controller: NewBackupDeletionController( controller: NewBackupDeletionController(
arktest.NewLogger(), arktest.NewLogger(),
sharedInformers.Ark().V1().DeleteBackupRequests(), sharedInformers.Ark().V1().DeleteBackupRequests(),
@ -150,7 +152,10 @@ func setupBackupDeletionControllerTest(objects ...runtime.Object) *backupDeletio
req: req, req: req,
} }
pluginManager.On("GetObjectStore", "objStoreProvider").Return(objectStore, nil) data.controller.newBackupStore = func(*v1.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) {
return backupStore, nil
}
pluginManager.On("CleanupClients").Return(nil) pluginManager.On("CleanupClients").Return(nil)
req.Namespace = "heptio-ark" req.Namespace = "heptio-ark"
@ -388,8 +393,6 @@ func TestBackupDeletionControllerProcessRequest(t *testing.T) {
} }
require.NoError(t, td.sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(location)) require.NoError(t, td.sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(location))
td.objectStore.On("Init", mock.Anything).Return(nil)
// Clear out req labels to make sure the controller adds them // Clear out req labels to make sure the controller adds them
td.req.Labels = make(map[string]string) td.req.Labels = make(map[string]string)
@ -406,12 +409,7 @@ func TestBackupDeletionControllerProcessRequest(t *testing.T) {
return true, backup, nil return true, backup, nil
}) })
td.controller.deleteBackupDir = func(_ logrus.FieldLogger, objectStore cloudprovider.ObjectStore, bucket, backupName string) error { td.backupStore.On("DeleteBackup", td.req.Spec.BackupName).Return(nil)
require.NotNil(t, objectStore)
require.Equal(t, location.Spec.ObjectStorage.Bucket, bucket)
require.Equal(t, td.req.Spec.BackupName, backupName)
return nil
}
err := td.controller.processRequest(td.req) err := td.controller.processRequest(td.req)
require.NoError(t, err) require.NoError(t, err)

View File

@ -29,7 +29,6 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/cloudprovider"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
@ -48,7 +47,7 @@ type backupSyncController struct {
namespace string namespace string
defaultBackupLocation string defaultBackupLocation string
newPluginManager func(logrus.FieldLogger) plugin.Manager newPluginManager func(logrus.FieldLogger) plugin.Manager
listCloudBackups func(logrus.FieldLogger, cloudprovider.ObjectStore, string) ([]*arkv1api.Backup, error) newBackupStore func(*arkv1api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error)
} }
func NewBackupSyncController( func NewBackupSyncController(
@ -77,7 +76,7 @@ func NewBackupSyncController(
// use variables to refer to these functions so they can be // use variables to refer to these functions so they can be
// replaced with fakes for testing. // replaced with fakes for testing.
newPluginManager: newPluginManager, newPluginManager: newPluginManager,
listCloudBackups: persistence.ListBackups, newBackupStore: persistence.NewObjectBackupStore,
} }
c.resyncFunc = c.run c.resyncFunc = c.run
@ -109,19 +108,19 @@ func (c *backupSyncController) run() {
log := c.logger.WithField("backupLocation", location.Name) log := c.logger.WithField("backupLocation", location.Name)
log.Info("Syncing backups from backup location") log.Info("Syncing backups from backup location")
objectStore, err := getObjectStoreForLocation(location, pluginManager) backupStore, err := c.newBackupStore(location, pluginManager, log)
if err != nil { if err != nil {
log.WithError(err).Error("Error getting object store for location") log.WithError(err).Error("Error getting backup store for location")
continue continue
} }
backupsInBackupStore, err := c.listCloudBackups(log, objectStore, location.Spec.ObjectStorage.Bucket) backupsInBackupStore, err := backupStore.ListBackups()
if err != nil { if err != nil {
log.WithError(err).Error("Error listing backups in object store") log.WithError(err).Error("Error listing backups in backup store")
continue continue
} }
log.WithField("backupCount", len(backupsInBackupStore)).Info("Got backups from object store") log.WithField("backupCount", len(backupsInBackupStore)).Info("Got backups from backup store")
cloudBackupNames := sets.NewString() cloudBackupNames := sets.NewString()
for _, cloudBackup := range backupsInBackupStore { for _, cloudBackup := range backupsInBackupStore {

View File

@ -20,25 +20,23 @@ import (
"testing" "testing"
"time" "time"
"github.com/pkg/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake" "github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions" informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/persistence"
persistencemocks "github.com/heptio/ark/pkg/persistence/mocks"
"github.com/heptio/ark/pkg/plugin" "github.com/heptio/ark/pkg/plugin"
pluginmocks "github.com/heptio/ark/pkg/plugin/mocks" pluginmocks "github.com/heptio/ark/pkg/plugin/mocks"
"github.com/heptio/ark/pkg/util/stringslice" "github.com/heptio/ark/pkg/util/stringslice"
arktest "github.com/heptio/ark/pkg/util/test" arktest "github.com/heptio/ark/pkg/util/test"
"github.com/stretchr/testify/assert"
) )
func defaultLocationsList(namespace string) []*arkv1api.BackupStorageLocation { func defaultLocationsList(namespace string) []*arkv1api.BackupStorageLocation {
@ -167,7 +165,7 @@ func TestBackupSyncControllerRun(t *testing.T) {
client = fake.NewSimpleClientset() client = fake.NewSimpleClientset()
sharedInformers = informers.NewSharedInformerFactory(client, 0) sharedInformers = informers.NewSharedInformerFactory(client, 0)
pluginManager = &pluginmocks.Manager{} pluginManager = &pluginmocks.Manager{}
objectStore = &arktest.ObjectStore{} backupStores = make(map[string]*persistencemocks.BackupStore)
) )
c := NewBackupSyncController( c := NewBackupSyncController(
@ -181,22 +179,23 @@ func TestBackupSyncControllerRun(t *testing.T) {
arktest.NewLogger(), arktest.NewLogger(),
).(*backupSyncController) ).(*backupSyncController)
pluginManager.On("GetObjectStore", "objStoreProvider").Return(objectStore, nil) c.newBackupStore = func(loc *arkv1api.BackupStorageLocation, _ persistence.ObjectStoreGetter, _ logrus.FieldLogger) (persistence.BackupStore, error) {
pluginManager.On("CleanupClients").Return(nil) // this gets populated just below, prior to exercising the method under test
return backupStores[loc.Name], nil
}
objectStore.On("Init", mock.Anything).Return(nil) pluginManager.On("CleanupClients").Return(nil)
for _, location := range test.locations { for _, location := range test.locations {
require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(location)) require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(location))
backupStores[location.Name] = &persistencemocks.BackupStore{}
} }
c.listCloudBackups = func(_ logrus.FieldLogger, _ cloudprovider.ObjectStore, bucket string) ([]*arkv1api.Backup, error) { for _, location := range test.locations {
backups, ok := test.cloudBackups[bucket] backupStore, ok := backupStores[location.Name]
if !ok { require.True(t, ok, "no mock backup store for location %s", location.Name)
return nil, errors.New("bucket not found")
}
return backups, nil backupStore.On("ListBackups").Return(test.cloudBackups[location.Spec.ObjectStorage.Bucket], nil)
} }
for _, existingBackup := range test.existingBackups { for _, existingBackup := range test.existingBackups {

View File

@ -47,10 +47,10 @@ type downloadRequestController struct {
downloadRequestLister listers.DownloadRequestLister downloadRequestLister listers.DownloadRequestLister
restoreLister listers.RestoreLister restoreLister listers.RestoreLister
clock clock.Clock clock clock.Clock
createSignedURL persistence.CreateSignedURLFunc
backupLocationLister listers.BackupStorageLocationLister backupLocationLister listers.BackupStorageLocationLister
backupLister listers.BackupLister backupLister listers.BackupLister
newPluginManager func(logrus.FieldLogger) plugin.Manager newPluginManager func(logrus.FieldLogger) plugin.Manager
newBackupStore func(*v1.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error)
} }
// NewDownloadRequestController creates a new DownloadRequestController. // NewDownloadRequestController creates a new DownloadRequestController.
@ -73,8 +73,8 @@ func NewDownloadRequestController(
// use variables to refer to these functions so they can be // use variables to refer to these functions so they can be
// replaced with fakes for testing. // replaced with fakes for testing.
createSignedURL: persistence.CreateSignedURL,
newPluginManager: newPluginManager, newPluginManager: newPluginManager,
newBackupStore: persistence.NewObjectBackupStore,
clock: &clock.RealClock{}, clock: &clock.RealClock{},
} }
@ -146,8 +146,8 @@ func (c *downloadRequestController) generatePreSignedURL(downloadRequest *v1.Dow
update := downloadRequest.DeepCopy() update := downloadRequest.DeepCopy()
var ( var (
directory string backupName string
err error err error
) )
switch downloadRequest.Spec.Target.Kind { switch downloadRequest.Spec.Target.Kind {
@ -157,12 +157,12 @@ func (c *downloadRequestController) generatePreSignedURL(downloadRequest *v1.Dow
return errors.Wrap(err, "error getting Restore") return errors.Wrap(err, "error getting Restore")
} }
directory = restore.Spec.BackupName backupName = restore.Spec.BackupName
default: default:
directory = downloadRequest.Spec.Target.Name backupName = downloadRequest.Spec.Target.Name
} }
backup, err := c.backupLister.Backups(downloadRequest.Namespace).Get(directory) backup, err := c.backupLister.Backups(downloadRequest.Namespace).Get(backupName)
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
@ -175,18 +175,17 @@ func (c *downloadRequestController) generatePreSignedURL(downloadRequest *v1.Dow
pluginManager := c.newPluginManager(log) pluginManager := c.newPluginManager(log)
defer pluginManager.CleanupClients() defer pluginManager.CleanupClients()
objectStore, err := getObjectStoreForLocation(backupLocation, pluginManager) backupStore, err := c.newBackupStore(backupLocation, pluginManager, log)
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
} }
update.Status.DownloadURL, err = c.createSignedURL(objectStore, downloadRequest.Spec.Target, backupLocation.Spec.ObjectStorage.Bucket, directory, signedURLTTL) if update.Status.DownloadURL, err = backupStore.GetDownloadURL(backupName, downloadRequest.Spec.Target); err != nil {
if err != nil {
return err return err
} }
update.Status.Phase = v1.DownloadRequestPhaseProcessed update.Status.Phase = v1.DownloadRequestPhaseProcessed
update.Status.Expiration = metav1.NewTime(c.clock.Now().Add(signedURLTTL)) update.Status.Expiration = metav1.NewTime(c.clock.Now().Add(persistence.DownloadURLTTL))
_, err = patchDownloadRequest(downloadRequest, update, c.downloadRequestClient) _, err = patchDownloadRequest(downloadRequest, update, c.downloadRequestClient)
return errors.WithStack(err) return errors.WithStack(err)

View File

@ -22,7 +22,6 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -32,6 +31,8 @@ import (
"github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake" "github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions" informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/persistence"
persistencemocks "github.com/heptio/ark/pkg/persistence/mocks"
"github.com/heptio/ark/pkg/plugin" "github.com/heptio/ark/pkg/plugin"
pluginmocks "github.com/heptio/ark/pkg/plugin/mocks" pluginmocks "github.com/heptio/ark/pkg/plugin/mocks"
kubeutil "github.com/heptio/ark/pkg/util/kube" kubeutil "github.com/heptio/ark/pkg/util/kube"
@ -42,7 +43,7 @@ type downloadRequestTestHarness struct {
client *fake.Clientset client *fake.Clientset
informerFactory informers.SharedInformerFactory informerFactory informers.SharedInformerFactory
pluginManager *pluginmocks.Manager pluginManager *pluginmocks.Manager
objectStore *arktest.ObjectStore backupStore *persistencemocks.BackupStore
controller *downloadRequestController controller *downloadRequestController
} }
@ -52,7 +53,7 @@ func newDownloadRequestTestHarness(t *testing.T) *downloadRequestTestHarness {
client = fake.NewSimpleClientset() client = fake.NewSimpleClientset()
informerFactory = informers.NewSharedInformerFactory(client, 0) informerFactory = informers.NewSharedInformerFactory(client, 0)
pluginManager = new(pluginmocks.Manager) pluginManager = new(pluginmocks.Manager)
objectStore = new(arktest.ObjectStore) backupStore = new(persistencemocks.BackupStore)
controller = NewDownloadRequestController( controller = NewDownloadRequestController(
client.ArkV1(), client.ArkV1(),
informerFactory.Ark().V1().DownloadRequests(), informerFactory.Ark().V1().DownloadRequests(),
@ -66,17 +67,19 @@ func newDownloadRequestTestHarness(t *testing.T) *downloadRequestTestHarness {
clockTime, err := time.Parse(time.RFC1123, time.RFC1123) clockTime, err := time.Parse(time.RFC1123, time.RFC1123)
require.NoError(t, err) require.NoError(t, err)
controller.clock = clock.NewFakeClock(clockTime) controller.clock = clock.NewFakeClock(clockTime)
controller.newBackupStore = func(*v1.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) {
return backupStore, nil
}
pluginManager.On("CleanupClients").Return() pluginManager.On("CleanupClients").Return()
objectStore.On("Init", mock.Anything).Return(nil)
return &downloadRequestTestHarness{ return &downloadRequestTestHarness{
client: client, client: client,
informerFactory: informerFactory, informerFactory: informerFactory,
pluginManager: pluginManager, pluginManager: pluginManager,
objectStore: objectStore, backupStore: backupStore,
controller: controller, controller: controller,
} }
} }
@ -118,15 +121,15 @@ func newBackupLocation(name, provider, bucket string) *v1.BackupStorageLocation
func TestProcessDownloadRequest(t *testing.T) { func TestProcessDownloadRequest(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
key string key string
downloadRequest *v1.DownloadRequest downloadRequest *v1.DownloadRequest
backup *v1.Backup backup *v1.Backup
restore *v1.Restore restore *v1.Restore
backupLocation *v1.BackupStorageLocation backupLocation *v1.BackupStorageLocation
expired bool expired bool
expectedErr string expectedErr string
expectedRequestedObject string expectGetsURL bool
}{ }{
{ {
name: "empty key returns without error", name: "empty key returns without error",
@ -163,64 +166,64 @@ func TestProcessDownloadRequest(t *testing.T) {
expectedErr: "backupstoragelocation.ark.heptio.com \"a-location\" not found", expectedErr: "backupstoragelocation.ark.heptio.com \"a-location\" not found",
}, },
{ {
name: "backup contents request with phase '' gets a url", name: "backup contents request with phase '' gets a url",
downloadRequest: newDownloadRequest("", v1.DownloadTargetKindBackupContents, "a-backup"), downloadRequest: newDownloadRequest("", v1.DownloadTargetKindBackupContents, "a-backup"),
backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup,
backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"),
expectedRequestedObject: "a-backup/a-backup.tar.gz", expectGetsURL: true,
}, },
{ {
name: "backup contents request with phase 'New' gets a url", name: "backup contents request with phase 'New' gets a url",
downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindBackupContents, "a-backup"), downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindBackupContents, "a-backup"),
backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup,
backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"),
expectedRequestedObject: "a-backup/a-backup.tar.gz", expectGetsURL: true,
}, },
{ {
name: "backup log request with phase '' gets a url", name: "backup log request with phase '' gets a url",
downloadRequest: newDownloadRequest("", v1.DownloadTargetKindBackupLog, "a-backup"), downloadRequest: newDownloadRequest("", v1.DownloadTargetKindBackupLog, "a-backup"),
backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup,
backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"),
expectedRequestedObject: "a-backup/a-backup-logs.gz", expectGetsURL: true,
}, },
{ {
name: "backup log request with phase 'New' gets a url", name: "backup log request with phase 'New' gets a url",
downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindBackupLog, "a-backup"), downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindBackupLog, "a-backup"),
backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup,
backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"),
expectedRequestedObject: "a-backup/a-backup-logs.gz", expectGetsURL: true,
}, },
{ {
name: "restore log request with phase '' gets a url", name: "restore log request with phase '' gets a url",
downloadRequest: newDownloadRequest("", v1.DownloadTargetKindRestoreLog, "a-backup-20170912150214"), downloadRequest: newDownloadRequest("", v1.DownloadTargetKindRestoreLog, "a-backup-20170912150214"),
restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore, restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore,
backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup,
backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"),
expectedRequestedObject: "a-backup/restore-a-backup-20170912150214-logs.gz", expectGetsURL: true,
}, },
{ {
name: "restore log request with phase 'New' gets a url", name: "restore log request with phase 'New' gets a url",
downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindRestoreLog, "a-backup-20170912150214"), downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindRestoreLog, "a-backup-20170912150214"),
restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore, restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore,
backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup,
backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"),
expectedRequestedObject: "a-backup/restore-a-backup-20170912150214-logs.gz", expectGetsURL: true,
}, },
{ {
name: "restore results request with phase '' gets a url", name: "restore results request with phase '' gets a url",
downloadRequest: newDownloadRequest("", v1.DownloadTargetKindRestoreResults, "a-backup-20170912150214"), downloadRequest: newDownloadRequest("", v1.DownloadTargetKindRestoreResults, "a-backup-20170912150214"),
restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore, restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore,
backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup,
backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"),
expectedRequestedObject: "a-backup/restore-a-backup-20170912150214-results.gz", expectGetsURL: true,
}, },
{ {
name: "restore results request with phase 'New' gets a url", name: "restore results request with phase 'New' gets a url",
downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindRestoreResults, "a-backup-20170912150214"), downloadRequest: newDownloadRequest(v1.DownloadRequestPhaseNew, v1.DownloadTargetKindRestoreResults, "a-backup-20170912150214"),
restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore, restore: arktest.NewTestRestore(v1.DefaultNamespace, "a-backup-20170912150214", v1.RestorePhaseCompleted).WithBackup("a-backup").Restore,
backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup, backup: arktest.NewTestBackup().WithName("a-backup").WithStorageLocation("a-location").Backup,
backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"), backupLocation: newBackupLocation("a-location", "a-provider", "a-bucket"),
expectedRequestedObject: "a-backup/restore-a-backup-20170912150214-results.gz", expectGetsURL: true,
}, },
{ {
name: "request with phase 'Processed' is not deleted if not expired", name: "request with phase 'Processed' is not deleted if not expired",
@ -268,12 +271,10 @@ func TestProcessDownloadRequest(t *testing.T) {
if tc.backupLocation != nil { if tc.backupLocation != nil {
require.NoError(t, harness.informerFactory.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(tc.backupLocation)) require.NoError(t, harness.informerFactory.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(tc.backupLocation))
harness.pluginManager.On("GetObjectStore", tc.backupLocation.Spec.Provider).Return(harness.objectStore, nil)
} }
if tc.expectedRequestedObject != "" { if tc.expectGetsURL {
harness.objectStore.On("CreateSignedURL", tc.backupLocation.Spec.ObjectStorage.Bucket, tc.expectedRequestedObject, mock.Anything).Return("a-url", nil) harness.backupStore.On("GetDownloadURL", tc.backup.Name, tc.downloadRequest.Spec.Target).Return("a-url", nil)
} }
// exercise method under test // exercise method under test
@ -291,7 +292,7 @@ func TestProcessDownloadRequest(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
} }
if tc.expectedRequestedObject != "" { if tc.expectGetsURL {
output, err := harness.client.ArkV1().DownloadRequests(tc.downloadRequest.Namespace).Get(tc.downloadRequest.Name, metav1.GetOptions{}) output, err := harness.client.ArkV1().DownloadRequests(tc.downloadRequest.Namespace).Get(tc.downloadRequest.Name, metav1.GetOptions{})
require.NoError(t, err) require.NoError(t, err)

View File

@ -41,7 +41,6 @@ import (
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
api "github.com/heptio/ark/pkg/apis/ark/v1" api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/cloudprovider"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
@ -90,11 +89,8 @@ type restoreController struct {
defaultBackupLocation string defaultBackupLocation string
metrics *metrics.ServerMetrics metrics *metrics.ServerMetrics
getBackup persistence.GetBackupFunc newPluginManager func(logger logrus.FieldLogger) plugin.Manager
downloadBackup persistence.DownloadBackupFunc newBackupStore func(*api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error)
uploadRestoreLog persistence.UploadRestoreLogFunc
uploadRestoreResults persistence.UploadRestoreResultsFunc
newPluginManager func(logger logrus.FieldLogger) plugin.Manager
} }
func NewRestoreController( func NewRestoreController(
@ -132,11 +128,8 @@ func NewRestoreController(
// use variables to refer to these functions so they can be // use variables to refer to these functions so they can be
// replaced with fakes for testing. // replaced with fakes for testing.
newPluginManager: newPluginManager, newPluginManager: newPluginManager,
getBackup: persistence.GetBackup, newBackupStore: persistence.NewObjectBackupStore,
downloadBackup: persistence.DownloadBackup,
uploadRestoreLog: persistence.UploadRestoreLog,
uploadRestoreResults: persistence.UploadRestoreResults,
} }
c.syncHandler = c.processRestore c.syncHandler = c.processRestore
@ -354,9 +347,8 @@ func (c *restoreController) processRestore(key string) error {
} }
type backupInfo struct { type backupInfo struct {
bucketName string
backup *api.Backup backup *api.Backup
objectStore cloudprovider.ObjectStore backupStore persistence.BackupStore
} }
func (c *restoreController) validateAndComplete(restore *api.Restore, pluginManager plugin.Manager) backupInfo { func (c *restoreController) validateAndComplete(restore *api.Restore, pluginManager plugin.Manager) backupInfo {
@ -469,9 +461,7 @@ func mostRecentCompletedBackup(backups []*api.Backup) *api.Backup {
// fetchBackupInfo checks the backup lister for a backup that matches the given name. If it doesn't // fetchBackupInfo checks the backup lister for a backup that matches the given name. If it doesn't
// find it, it tries to retrieve it from one of the backup storage locations. // find it, it tries to retrieve it from one of the backup storage locations.
func (c *restoreController) fetchBackupInfo(backupName string, pluginManager plugin.Manager) (backupInfo, error) { func (c *restoreController) fetchBackupInfo(backupName string, pluginManager plugin.Manager) (backupInfo, error) {
var info backupInfo backup, err := c.backupLister.Backups(c.namespace).Get(backupName)
var err error
info.backup, err = c.backupLister.Backups(c.namespace).Get(backupName)
if err != nil { if err != nil {
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
return backupInfo{}, errors.WithStack(err) return backupInfo{}, errors.WithStack(err)
@ -482,18 +472,20 @@ func (c *restoreController) fetchBackupInfo(backupName string, pluginManager plu
return c.fetchFromBackupStorage(backupName, pluginManager) return c.fetchFromBackupStorage(backupName, pluginManager)
} }
location, err := c.backupLocationLister.BackupStorageLocations(c.namespace).Get(info.backup.Spec.StorageLocation) location, err := c.backupLocationLister.BackupStorageLocations(c.namespace).Get(backup.Spec.StorageLocation)
if err != nil { if err != nil {
return backupInfo{}, errors.WithStack(err) return backupInfo{}, errors.WithStack(err)
} }
info.objectStore, err = getObjectStoreForLocation(location, pluginManager) backupStore, err := c.newBackupStore(location, pluginManager, c.logger)
if err != nil { if err != nil {
return backupInfo{}, errors.Wrap(err, "error initializing object store") return backupInfo{}, err
} }
info.bucketName = location.Spec.ObjectStorage.Bucket
return info, nil return backupInfo{
backup: backup,
backupStore: backupStore,
}, nil
} }
// fetchFromBackupStorage checks each backup storage location, starting with the default, // fetchFromBackupStorage checks each backup storage location, starting with the default,
@ -541,12 +533,12 @@ func orderedBackupLocations(locations []*api.BackupStorageLocation, defaultLocat
} }
func (c *restoreController) backupInfoForLocation(location *api.BackupStorageLocation, backupName string, pluginManager plugin.Manager) (backupInfo, error) { func (c *restoreController) backupInfoForLocation(location *api.BackupStorageLocation, backupName string, pluginManager plugin.Manager) (backupInfo, error) {
objectStore, err := getObjectStoreForLocation(location, pluginManager) backupStore, err := persistence.NewObjectBackupStore(location, pluginManager, c.logger)
if err != nil { if err != nil {
return backupInfo{}, err return backupInfo{}, err
} }
backup, err := c.getBackup(objectStore, location.Spec.ObjectStorage.Bucket, backupName) backup, err := backupStore.GetBackupMetadata(backupName)
if err != nil { if err != nil {
return backupInfo{}, err return backupInfo{}, err
} }
@ -562,9 +554,8 @@ func (c *restoreController) backupInfoForLocation(location *api.BackupStorageLoc
} }
return backupInfo{ return backupInfo{
bucketName: location.Spec.ObjectStorage.Bucket,
backup: backupCreated, backup: backupCreated,
objectStore: objectStore, backupStore: backupStore,
}, nil }, nil
} }
@ -603,7 +594,7 @@ func (c *restoreController) runRestore(
"backup": restore.Spec.BackupName, "backup": restore.Spec.BackupName,
}) })
backupFile, err := downloadToTempFile(info.objectStore, info.bucketName, restore.Spec.BackupName, c.downloadBackup, c.logger) backupFile, err := downloadToTempFile(restore.Spec.BackupName, info.backupStore, c.logger)
if err != nil { if err != nil {
logContext.WithError(err).Error("Error downloading backup") logContext.WithError(err).Error("Error downloading backup")
restoreErrors.Ark = append(restoreErrors.Ark, err.Error()) restoreErrors.Ark = append(restoreErrors.Ark, err.Error())
@ -637,8 +628,8 @@ func (c *restoreController) runRestore(
return return
} }
if err := c.uploadRestoreLog(info.objectStore, info.bucketName, restore.Spec.BackupName, restore.Name, logFile); err != nil { if err := info.backupStore.PutRestoreLog(restore.Spec.BackupName, restore.Name, logFile); err != nil {
restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error uploading log file to object storage: %v", err)) restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error uploading log file to backup storage: %v", err))
} }
m := map[string]api.RestoreResult{ m := map[string]api.RestoreResult{
@ -658,20 +649,19 @@ func (c *restoreController) runRestore(
logContext.WithError(errors.WithStack(err)).Error("Error resetting results file offset to 0") logContext.WithError(errors.WithStack(err)).Error("Error resetting results file offset to 0")
return return
} }
if err := c.uploadRestoreResults(info.objectStore, info.bucketName, restore.Spec.BackupName, restore.Name, resultsFile); err != nil { if err := info.backupStore.PutRestoreResults(restore.Spec.BackupName, restore.Name, resultsFile); err != nil {
logContext.WithError(errors.WithStack(err)).Error("Error uploading results files to object storage") logContext.WithError(errors.WithStack(err)).Error("Error uploading results file to backup storage")
} }
return return
} }
func downloadToTempFile( func downloadToTempFile(
objectStore cloudprovider.ObjectStore, backupName string,
bucket, backupName string, backupStore persistence.BackupStore,
downloadBackup persistence.DownloadBackupFunc,
logger logrus.FieldLogger, logger logrus.FieldLogger,
) (*os.File, error) { ) (*os.File, error) {
readCloser, err := downloadBackup(objectStore, bucket, backupName) readCloser, err := backupStore.GetBackupContents(backupName)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -29,16 +29,18 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
api "github.com/heptio/ark/pkg/apis/ark/v1" api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake" "github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions" informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/metrics" "github.com/heptio/ark/pkg/metrics"
"github.com/heptio/ark/pkg/persistence"
persistencemocks "github.com/heptio/ark/pkg/persistence/mocks"
"github.com/heptio/ark/pkg/plugin" "github.com/heptio/ark/pkg/plugin"
pluginmocks "github.com/heptio/ark/pkg/plugin/mocks" pluginmocks "github.com/heptio/ark/pkg/plugin/mocks"
"github.com/heptio/ark/pkg/restore" "github.com/heptio/ark/pkg/restore"
@ -48,14 +50,14 @@ import (
func TestFetchBackupInfo(t *testing.T) { func TestFetchBackupInfo(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
backupName string backupName string
informerLocations []*api.BackupStorageLocation informerLocations []*api.BackupStorageLocation
informerBackups []*api.Backup informerBackups []*api.Backup
backupServiceBackup *api.Backup backupStoreBackup *api.Backup
backupServiceError error backupStoreError error
expectedRes *api.Backup expectedRes *api.Backup
expectedErr bool expectedErr bool
}{ }{
{ {
name: "lister has backup", name: "lister has backup",
@ -65,18 +67,18 @@ func TestFetchBackupInfo(t *testing.T) {
expectedRes: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup, expectedRes: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup,
}, },
{ {
name: "lister does not have a backup, but backupSvc does", name: "lister does not have a backup, but backupSvc does",
backupName: "backup-1", backupName: "backup-1",
backupServiceBackup: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup, backupStoreBackup: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup,
informerLocations: []*api.BackupStorageLocation{arktest.NewTestBackupStorageLocation().WithName("default").WithProvider("myCloud").WithObjectStorage("bucket").BackupStorageLocation}, informerLocations: []*api.BackupStorageLocation{arktest.NewTestBackupStorageLocation().WithName("default").WithProvider("myCloud").WithObjectStorage("bucket").BackupStorageLocation},
informerBackups: []*api.Backup{arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup}, informerBackups: []*api.Backup{arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup},
expectedRes: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup, expectedRes: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup,
}, },
{ {
name: "no backup", name: "no backup",
backupName: "backup-1", backupName: "backup-1",
backupServiceError: errors.New("no backup here"), backupStoreError: errors.New("no backup here"),
expectedErr: true, expectedErr: true,
}, },
} }
@ -88,11 +90,11 @@ func TestFetchBackupInfo(t *testing.T) {
sharedInformers = informers.NewSharedInformerFactory(client, 0) sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = arktest.NewLogger() logger = arktest.NewLogger()
pluginManager = &pluginmocks.Manager{} pluginManager = &pluginmocks.Manager{}
objectStore = &arktest.ObjectStore{} backupStore = &persistencemocks.BackupStore{}
) )
defer restorer.AssertExpectations(t) defer restorer.AssertExpectations(t)
defer objectStore.AssertExpectations(t) defer backupStore.AssertExpectations(t)
c := NewRestoreController( c := NewRestoreController(
api.DefaultNamespace, api.DefaultNamespace,
@ -110,10 +112,11 @@ func TestFetchBackupInfo(t *testing.T) {
metrics.NewServerMetrics(), metrics.NewServerMetrics(),
).(*restoreController) ).(*restoreController)
if test.backupServiceError == nil { c.newBackupStore = func(*api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) {
pluginManager.On("GetObjectStore", "myCloud").Return(objectStore, nil) return backupStore, nil
objectStore.On("Init", mock.Anything).Return(nil) }
if test.backupStoreError == nil {
for _, itm := range test.informerLocations { for _, itm := range test.informerLocations {
sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(itm) sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(itm)
} }
@ -123,19 +126,23 @@ func TestFetchBackupInfo(t *testing.T) {
} }
} }
if test.backupServiceBackup != nil || test.backupServiceError != nil { if test.backupStoreBackup != nil && test.backupStoreError != nil {
c.getBackup = func(_ cloudprovider.ObjectStore, bucket, backup string) (*api.Backup, error) { panic("developer error - only one of backupStoreBackup, backupStoreError can be non-nil")
require.Equal(t, "bucket", bucket) }
require.Equal(t, test.backupName, backup)
return test.backupServiceBackup, test.backupServiceError if test.backupStoreError != nil {
} // TODO why do I need .Maybe() here?
backupStore.On("GetBackupMetadata", test.backupName).Return(nil, test.backupStoreError).Maybe()
}
if test.backupStoreBackup != nil {
// TODO why do I need .Maybe() here?
backupStore.On("GetBackupMetadata", test.backupName).Return(test.backupStoreBackup, nil).Maybe()
} }
info, err := c.fetchBackupInfo(test.backupName, pluginManager) info, err := c.fetchBackupInfo(test.backupName, pluginManager)
if assert.Equal(t, test.expectedErr, err != nil) { require.Equal(t, test.expectedErr, err != nil)
assert.Equal(t, test.expectedRes, info.backup) assert.Equal(t, test.expectedRes, info.backup)
}
}) })
} }
} }
@ -180,11 +187,7 @@ func TestProcessRestoreSkips(t *testing.T) {
restorer = &fakeRestorer{} restorer = &fakeRestorer{}
sharedInformers = informers.NewSharedInformerFactory(client, 0) sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = arktest.NewLogger() logger = arktest.NewLogger()
pluginManager = &pluginmocks.Manager{}
objectStore = &arktest.ObjectStore{}
) )
defer restorer.AssertExpectations(t)
defer objectStore.AssertExpectations(t)
c := NewRestoreController( c := NewRestoreController(
api.DefaultNamespace, api.DefaultNamespace,
@ -197,7 +200,7 @@ func TestProcessRestoreSkips(t *testing.T) {
false, // pvProviderExists false, // pvProviderExists
logger, logger,
logrus.InfoLevel, logrus.InfoLevel,
func(logrus.FieldLogger) plugin.Manager { return pluginManager }, nil,
"default", "default",
metrics.NewServerMetrics(), metrics.NewServerMetrics(),
).(*restoreController) ).(*restoreController)
@ -207,6 +210,7 @@ func TestProcessRestoreSkips(t *testing.T) {
} }
err := c.processRestore(test.restoreKey) err := c.processRestore(test.restoreKey)
assert.Equal(t, test.expectError, err != nil) assert.Equal(t, test.expectError, err != nil)
}) })
} }
@ -214,22 +218,22 @@ func TestProcessRestoreSkips(t *testing.T) {
func TestProcessRestore(t *testing.T) { func TestProcessRestore(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
restoreKey string restoreKey string
location *api.BackupStorageLocation location *api.BackupStorageLocation
restore *api.Restore restore *api.Restore
backup *api.Backup backup *api.Backup
restorerError error restorerError error
allowRestoreSnapshots bool allowRestoreSnapshots bool
expectedErr bool expectedErr bool
expectedPhase string expectedPhase string
expectedValidationErrors []string expectedValidationErrors []string
expectedRestoreErrors int expectedRestoreErrors int
expectedRestorerCall *api.Restore expectedRestorerCall *api.Restore
backupServiceGetBackupError error backupStoreGetBackupMetadataErr error
uploadLogError error backupStoreGetBackupContentsErr error
backupServiceDownloadBackupError error putRestoreLogErr error
expectedFinalPhase string expectedFinalPhase string
}{ }{
{ {
name: "restore with both namespace in both includedNamespaces and excludedNamespaces fails validation", name: "restore with both namespace in both includedNamespaces and excludedNamespaces fails validation",
@ -279,12 +283,12 @@ func TestProcessRestore(t *testing.T) {
expectedRestorerCall: NewRestore("foo", "bar", "backup-1", "ns-1", "", api.RestorePhaseInProgress).WithSchedule("sched-1").Restore, expectedRestorerCall: NewRestore("foo", "bar", "backup-1", "ns-1", "", api.RestorePhaseInProgress).WithSchedule("sched-1").Restore,
}, },
{ {
name: "restore with non-existent backup name fails", name: "restore with non-existent backup name fails",
restore: NewRestore("foo", "bar", "backup-1", "ns-1", "*", api.RestorePhaseNew).Restore, restore: NewRestore("foo", "bar", "backup-1", "ns-1", "*", api.RestorePhaseNew).Restore,
expectedErr: false, expectedErr: false,
expectedPhase: string(api.RestorePhaseFailedValidation), expectedPhase: string(api.RestorePhaseFailedValidation),
expectedValidationErrors: []string{"Error retrieving backup: not able to fetch from backup storage"}, expectedValidationErrors: []string{"Error retrieving backup: not able to fetch from backup storage"},
backupServiceGetBackupError: errors.New("no backup here"), backupStoreGetBackupMetadataErr: errors.New("no backup here"),
}, },
{ {
name: "restorer throwing an error causes the restore to fail", name: "restorer throwing an error causes the restore to fail",
@ -386,12 +390,12 @@ func TestProcessRestore(t *testing.T) {
}, },
}, },
{ {
name: "backup download error results in failed restore", name: "backup download error results in failed restore",
location: arktest.NewTestBackupStorageLocation().WithName("default").WithProvider("myCloud").WithObjectStorage("bucket").BackupStorageLocation, location: arktest.NewTestBackupStorageLocation().WithName("default").WithProvider("myCloud").WithObjectStorage("bucket").BackupStorageLocation,
restore: NewRestore(api.DefaultNamespace, "bar", "backup-1", "ns-1", "", api.RestorePhaseNew).Restore, restore: NewRestore(api.DefaultNamespace, "bar", "backup-1", "ns-1", "", api.RestorePhaseNew).Restore,
expectedPhase: string(api.RestorePhaseInProgress), expectedPhase: string(api.RestorePhaseInProgress),
expectedFinalPhase: string(api.RestorePhaseFailed), expectedFinalPhase: string(api.RestorePhaseFailed),
backupServiceDownloadBackupError: errors.New("Couldn't download backup"), backupStoreGetBackupContentsErr: errors.New("Couldn't download backup"),
backup: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup, backup: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("default").Backup,
}, },
} }
@ -404,11 +408,11 @@ func TestProcessRestore(t *testing.T) {
sharedInformers = informers.NewSharedInformerFactory(client, 0) sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = arktest.NewLogger() logger = arktest.NewLogger()
pluginManager = &pluginmocks.Manager{} pluginManager = &pluginmocks.Manager{}
objectStore = &arktest.ObjectStore{} backupStore = &persistencemocks.BackupStore{}
) )
defer restorer.AssertExpectations(t)
defer objectStore.AssertExpectations(t) defer restorer.AssertExpectations(t)
defer backupStore.AssertExpectations(t)
c := NewRestoreController( c := NewRestoreController(
api.DefaultNamespace, api.DefaultNamespace,
@ -426,13 +430,15 @@ func TestProcessRestore(t *testing.T) {
metrics.NewServerMetrics(), metrics.NewServerMetrics(),
).(*restoreController) ).(*restoreController)
c.newBackupStore = func(*api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) {
return backupStore, nil
}
if test.location != nil { if test.location != nil {
sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(test.location) sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(test.location)
} }
if test.backup != nil { if test.backup != nil {
sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup) sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup)
pluginManager.On("GetObjectStore", "myCloud").Return(objectStore, nil)
objectStore.On("Init", mock.Anything).Return(nil)
} }
if test.restore != nil { if test.restore != nil {
@ -481,28 +487,17 @@ func TestProcessRestore(t *testing.T) {
if test.restorerError != nil { if test.restorerError != nil {
errors.Namespaces = map[string][]string{"ns-1": {test.restorerError.Error()}} errors.Namespaces = map[string][]string{"ns-1": {test.restorerError.Error()}}
} }
if test.uploadLogError != nil { if test.putRestoreLogErr != nil {
errors.Ark = append(errors.Ark, "error uploading log file to object storage: "+test.uploadLogError.Error()) errors.Ark = append(errors.Ark, "error uploading log file to object storage: "+test.putRestoreLogErr.Error())
} }
if test.expectedRestorerCall != nil { if test.expectedRestorerCall != nil {
c.downloadBackup = func(objectStore cloudprovider.ObjectStore, bucket, backup string) (io.ReadCloser, error) { backupStore.On("GetBackupContents", test.backup.Name).Return(ioutil.NopCloser(bytes.NewReader([]byte("hello world"))), nil)
require.Equal(t, test.backup.Name, backup)
return ioutil.NopCloser(bytes.NewReader([]byte("hello world"))), nil
}
restorer.On("Restore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(warnings, errors) restorer.On("Restore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(warnings, errors)
c.uploadRestoreLog = func(objectStore cloudprovider.ObjectStore, bucket, backup, restore string, log io.Reader) error { backupStore.On("PutRestoreLog", test.backup.Name, test.restore.Name, mock.Anything).Return(test.putRestoreLogErr)
require.Equal(t, test.backup.Name, backup)
require.Equal(t, test.restore.Name, restore)
return test.uploadLogError
}
c.uploadRestoreResults = func(objectStore cloudprovider.ObjectStore, bucket, backup, restore string, results io.Reader) error { backupStore.On("PutRestoreResults", test.backup.Name, test.restore.Name, mock.Anything).Return(nil)
require.Equal(t, test.backup.Name, backup)
require.Equal(t, test.restore.Name, restore)
return nil
}
} }
var ( var (
@ -516,20 +511,14 @@ func TestProcessRestore(t *testing.T) {
} }
} }
if test.backupServiceGetBackupError != nil { if test.backupStoreGetBackupMetadataErr != nil {
c.getBackup = func(_ cloudprovider.ObjectStore, bucket, backup string) (*api.Backup, error) { // TODO why do I need .Maybe() here?
require.Equal(t, "bucket", bucket) backupStore.On("GetBackupMetadata", test.restore.Spec.BackupName).Return(nil, test.backupStoreGetBackupMetadataErr).Maybe()
require.Equal(t, test.restore.Spec.BackupName, backup)
return nil, test.backupServiceGetBackupError
}
} }
if test.backupServiceDownloadBackupError != nil { if test.backupStoreGetBackupContentsErr != nil {
c.downloadBackup = func(_ cloudprovider.ObjectStore, bucket, backupName string) (io.ReadCloser, error) { // TODO why do I need .Maybe() here?
require.Equal(t, "bucket", bucket) backupStore.On("GetBackupContents", test.restore.Spec.BackupName).Return(nil, test.backupStoreGetBackupContentsErr).Maybe()
require.Equal(t, test.restore.Spec.BackupName, backupName)
return nil, test.backupServiceDownloadBackupError
}
} }
if test.restore != nil { if test.restore != nil {

View File

@ -0,0 +1,158 @@
// Code generated by mockery v1.0.0
package mocks
import io "io"
import mock "github.com/stretchr/testify/mock"
import v1 "github.com/heptio/ark/pkg/apis/ark/v1"
// BackupStore is an autogenerated mock type for the BackupStore type
type BackupStore struct {
mock.Mock
}
// DeleteBackup provides a mock function with given fields: name
func (_m *BackupStore) DeleteBackup(name string) error {
ret := _m.Called(name)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(name)
} else {
r0 = ret.Error(0)
}
return r0
}
// GetBackupContents provides a mock function with given fields: name
func (_m *BackupStore) GetBackupContents(name string) (io.ReadCloser, error) {
ret := _m.Called(name)
var r0 io.ReadCloser
if rf, ok := ret.Get(0).(func(string) io.ReadCloser); ok {
r0 = rf(name)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(io.ReadCloser)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(name)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetBackupMetadata provides a mock function with given fields: name
func (_m *BackupStore) GetBackupMetadata(name string) (*v1.Backup, error) {
ret := _m.Called(name)
var r0 *v1.Backup
if rf, ok := ret.Get(0).(func(string) *v1.Backup); ok {
r0 = rf(name)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*v1.Backup)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(name)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetDownloadURL provides a mock function with given fields: backup, target
func (_m *BackupStore) GetDownloadURL(backup string, target v1.DownloadTarget) (string, error) {
ret := _m.Called(backup, target)
var r0 string
if rf, ok := ret.Get(0).(func(string, v1.DownloadTarget) string); ok {
r0 = rf(backup, target)
} else {
r0 = ret.Get(0).(string)
}
var r1 error
if rf, ok := ret.Get(1).(func(string, v1.DownloadTarget) error); ok {
r1 = rf(backup, target)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ListBackups provides a mock function with given fields:
func (_m *BackupStore) ListBackups() ([]*v1.Backup, error) {
ret := _m.Called()
var r0 []*v1.Backup
if rf, ok := ret.Get(0).(func() []*v1.Backup); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*v1.Backup)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// PutBackup provides a mock function with given fields: name, metadata, contents, log
func (_m *BackupStore) PutBackup(name string, metadata io.Reader, contents io.Reader, log io.Reader) error {
ret := _m.Called(name, metadata, contents, log)
var r0 error
if rf, ok := ret.Get(0).(func(string, io.Reader, io.Reader, io.Reader) error); ok {
r0 = rf(name, metadata, contents, log)
} else {
r0 = ret.Error(0)
}
return r0
}
// PutRestoreLog provides a mock function with given fields: backup, restore, log
func (_m *BackupStore) PutRestoreLog(backup string, restore string, log io.Reader) error {
ret := _m.Called(backup, restore, log)
var r0 error
if rf, ok := ret.Get(0).(func(string, string, io.Reader) error); ok {
r0 = rf(backup, restore, log)
} else {
r0 = ret.Error(0)
}
return r0
}
// PutRestoreResults provides a mock function with given fields: backup, restore, results
func (_m *BackupStore) PutRestoreResults(backup string, restore string, results io.Reader) error {
ret := _m.Called(backup, restore, results)
var r0 error
if rf, ok := ret.Get(0).(func(string, string, io.Reader) error); ok {
r0 = rf(backup, restore, results)
} else {
r0 = ret.Error(0)
}
return r0
}

View File

@ -1,267 +0,0 @@
/*
Copyright 2017 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistence
import (
"fmt"
"io"
"io/ioutil"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
kerrors "k8s.io/apimachinery/pkg/util/errors"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/generated/clientset/versioned/scheme"
)
// BackupLister knows how to list backups in object storage.
type BackupLister interface {
// ListBackups lists all the api.Backups in object storage for the given bucket.
ListBackups(bucket string) ([]*api.Backup, error)
}
const (
metadataFileFormatString = "%s/ark-backup.json"
backupFileFormatString = "%s/%s.tar.gz"
backupLogFileFormatString = "%s/%s-logs.gz"
restoreLogFileFormatString = "%s/restore-%s-logs.gz"
restoreResultsFileFormatString = "%s/restore-%s-results.gz"
)
func getMetadataKey(directory string) string {
return fmt.Sprintf(metadataFileFormatString, directory)
}
func getBackupContentsKey(directory, backup string) string {
return fmt.Sprintf(backupFileFormatString, directory, backup)
}
func getBackupLogKey(directory, backup string) string {
return fmt.Sprintf(backupLogFileFormatString, directory, backup)
}
func getRestoreLogKey(directory, restore string) string {
return fmt.Sprintf(restoreLogFileFormatString, directory, restore)
}
func getRestoreResultsKey(directory, restore string) string {
return fmt.Sprintf(restoreResultsFileFormatString, directory, restore)
}
func seekToBeginning(r io.Reader) error {
seeker, ok := r.(io.Seeker)
if !ok {
return nil
}
_, err := seeker.Seek(0, 0)
return err
}
func seekAndPutObject(objectStore cloudprovider.ObjectStore, bucket, key string, file io.Reader) error {
if file == nil {
return nil
}
if err := seekToBeginning(file); err != nil {
return errors.WithStack(err)
}
return objectStore.PutObject(bucket, key, file)
}
func UploadBackupLog(objectStore cloudprovider.ObjectStore, bucket, backupName string, log io.Reader) error {
logKey := getBackupLogKey(backupName, backupName)
return seekAndPutObject(objectStore, bucket, logKey, log)
}
func UploadBackupMetadata(objectStore cloudprovider.ObjectStore, bucket, backupName string, metadata io.Reader) error {
metadataKey := getMetadataKey(backupName)
return seekAndPutObject(objectStore, bucket, metadataKey, metadata)
}
func DeleteBackupMetadata(objectStore cloudprovider.ObjectStore, bucket, backupName string) error {
metadataKey := getMetadataKey(backupName)
return objectStore.DeleteObject(bucket, metadataKey)
}
func UploadBackupData(objectStore cloudprovider.ObjectStore, bucket, backupName string, backup io.Reader) error {
backupKey := getBackupContentsKey(backupName, backupName)
return seekAndPutObject(objectStore, bucket, backupKey, backup)
}
func UploadBackup(logger logrus.FieldLogger, objectStore cloudprovider.ObjectStore, bucket, backupName string, metadata, backup, log io.Reader) error {
if err := UploadBackupLog(objectStore, bucket, backupName, log); err != nil {
// Uploading the log file is best-effort; if it fails, we log the error but it doesn't impact the
// backup's status.
logger.WithError(err).WithField("bucket", bucket).Error("Error uploading log file")
}
if metadata == nil {
// If we don't have metadata, something failed, and there's no point in continuing. An object
// storage bucket that is missing the metadata file can't be restored, nor can its logs be
// viewed.
return nil
}
// upload metadata file
if err := UploadBackupMetadata(objectStore, bucket, backupName, metadata); err != nil {
// failure to upload metadata file is a hard-stop
return err
}
// upload tar file
if err := UploadBackupData(objectStore, bucket, backupName, backup); err != nil {
// try to delete the metadata file since the data upload failed
deleteErr := DeleteBackupMetadata(objectStore, bucket, backupName)
return kerrors.NewAggregate([]error{err, deleteErr})
}
return nil
}
// DownloadBackupFunc is a function that can download backup metadata from a bucket in object storage.
type DownloadBackupFunc func(objectStore cloudprovider.ObjectStore, bucket, backupName string) (io.ReadCloser, error)
// DownloadBackup downloads an Ark backup with the specified object key from object storage via the cloud API.
// It returns the snapshot metadata and data (separately), or an error if a problem is encountered
// downloading or reading the file from the cloud API.
func DownloadBackup(objectStore cloudprovider.ObjectStore, bucket, backupName string) (io.ReadCloser, error) {
return objectStore.GetObject(bucket, getBackupContentsKey(backupName, backupName))
}
func ListBackups(logger logrus.FieldLogger, objectStore cloudprovider.ObjectStore, bucket string) ([]*api.Backup, error) {
prefixes, err := objectStore.ListCommonPrefixes(bucket, "/")
if err != nil {
return nil, err
}
if len(prefixes) == 0 {
return []*api.Backup{}, nil
}
output := make([]*api.Backup, 0, len(prefixes))
for _, backupDir := range prefixes {
backup, err := GetBackup(objectStore, bucket, backupDir)
if err != nil {
logger.WithError(err).WithField("dir", backupDir).Error("Error reading backup directory")
continue
}
output = append(output, backup)
}
return output, nil
}
//GetBackupFunc is a function that can retrieve backup metadata from an object store
type GetBackupFunc func(objectStore cloudprovider.ObjectStore, bucket, backupName string) (*api.Backup, error)
// GetBackup gets the specified api.Backup from the given bucket in object storage.
func GetBackup(objectStore cloudprovider.ObjectStore, bucket, backupName string) (*api.Backup, error) {
key := getMetadataKey(backupName)
res, err := objectStore.GetObject(bucket, key)
if err != nil {
return nil, err
}
defer res.Close()
data, err := ioutil.ReadAll(res)
if err != nil {
return nil, errors.WithStack(err)
}
decoder := scheme.Codecs.UniversalDecoder(api.SchemeGroupVersion)
obj, _, err := decoder.Decode(data, nil, nil)
if err != nil {
return nil, errors.WithStack(err)
}
backup, ok := obj.(*api.Backup)
if !ok {
return nil, errors.Errorf("unexpected type for %s/%s: %T", bucket, key, obj)
}
return backup, nil
}
// DeleteBackupDirFunc is a function that can delete a backup directory from a bucket in object storage.
type DeleteBackupDirFunc func(logger logrus.FieldLogger, objectStore cloudprovider.ObjectStore, bucket, backupName string) error
// DeleteBackupDir deletes all files in object storage for the given backup.
func DeleteBackupDir(logger logrus.FieldLogger, objectStore cloudprovider.ObjectStore, bucket, backupName string) error {
objects, err := objectStore.ListObjects(bucket, backupName+"/")
if err != nil {
return err
}
var errs []error
for _, key := range objects {
logger.WithFields(logrus.Fields{
"bucket": bucket,
"key": key,
}).Debug("Trying to delete object")
if err := objectStore.DeleteObject(bucket, key); err != nil {
errs = append(errs, err)
}
}
return errors.WithStack(kerrors.NewAggregate(errs))
}
// CreateSignedURLFunc is a function that can create a signed URL for an object in object storage.
type CreateSignedURLFunc func(objectStore cloudprovider.ObjectStore, target api.DownloadTarget, bucket, directory string, ttl time.Duration) (string, error)
// CreateSignedURL creates a pre-signed URL that can be used to download a file from object
// storage. The URL expires after ttl.
func CreateSignedURL(objectStore cloudprovider.ObjectStore, target api.DownloadTarget, bucket, directory string, ttl time.Duration) (string, error) {
switch target.Kind {
case api.DownloadTargetKindBackupContents:
return objectStore.CreateSignedURL(bucket, getBackupContentsKey(directory, target.Name), ttl)
case api.DownloadTargetKindBackupLog:
return objectStore.CreateSignedURL(bucket, getBackupLogKey(directory, target.Name), ttl)
case api.DownloadTargetKindRestoreLog:
return objectStore.CreateSignedURL(bucket, getRestoreLogKey(directory, target.Name), ttl)
case api.DownloadTargetKindRestoreResults:
return objectStore.CreateSignedURL(bucket, getRestoreResultsKey(directory, target.Name), ttl)
default:
return "", errors.Errorf("unsupported download target kind %q", target.Kind)
}
}
// UploadRestoreLogFunc is a function that can upload a restore log to a bucket in object storage.
type UploadRestoreLogFunc func(objectStore cloudprovider.ObjectStore, bucket, backup, restore string, log io.Reader) error
// UploadRestoreLog uploads the restore's log file to object storage.
func UploadRestoreLog(objectStore cloudprovider.ObjectStore, bucket, backup, restore string, log io.Reader) error {
key := getRestoreLogKey(backup, restore)
return objectStore.PutObject(bucket, key, log)
}
// UploadRestoreResultsFunc is a function that can upload restore results to a bucket in object storage.
type UploadRestoreResultsFunc func(objectStore cloudprovider.ObjectStore, bucket, backup, restore string, results io.Reader) error
// UploadRestoreResults uploads the restore's results file to object storage.
func UploadRestoreResults(objectStore cloudprovider.ObjectStore, bucket, backup, restore string, results io.Reader) error {
key := getRestoreResultsKey(backup, restore)
return objectStore.PutObject(bucket, key, results)
}

View File

@ -1,366 +0,0 @@
/*
Copyright 2017 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistence
import (
"bytes"
"encoding/json"
"errors"
"io"
"io/ioutil"
"strings"
"testing"
"time"
testutil "github.com/heptio/ark/pkg/util/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/util/encode"
arktest "github.com/heptio/ark/pkg/util/test"
)
func TestUploadBackup(t *testing.T) {
tests := []struct {
name string
metadata io.ReadSeeker
metadataError error
expectMetadataDelete bool
backup io.ReadSeeker
backupError error
expectBackupUpload bool
log io.ReadSeeker
logError error
expectedErr string
}{
{
name: "normal case",
metadata: newStringReadSeeker("foo"),
backup: newStringReadSeeker("bar"),
expectBackupUpload: true,
log: newStringReadSeeker("baz"),
},
{
name: "error on metadata upload does not upload data",
metadata: newStringReadSeeker("foo"),
metadataError: errors.New("md"),
log: newStringReadSeeker("baz"),
expectedErr: "md",
},
{
name: "error on data upload deletes metadata",
metadata: newStringReadSeeker("foo"),
backup: newStringReadSeeker("bar"),
expectBackupUpload: true,
backupError: errors.New("backup"),
expectMetadataDelete: true,
expectedErr: "backup",
},
{
name: "error on log upload is ok",
metadata: newStringReadSeeker("foo"),
backup: newStringReadSeeker("bar"),
expectBackupUpload: true,
log: newStringReadSeeker("baz"),
logError: errors.New("log"),
},
{
name: "don't upload data when metadata is nil",
backup: newStringReadSeeker("bar"),
log: newStringReadSeeker("baz"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
objectStore = &testutil.ObjectStore{}
bucket = "test-bucket"
backupName = "test-backup"
logger = arktest.NewLogger()
)
defer objectStore.AssertExpectations(t)
if test.metadata != nil {
objectStore.On("PutObject", bucket, backupName+"/ark-backup.json", test.metadata).Return(test.metadataError)
}
if test.backup != nil && test.expectBackupUpload {
objectStore.On("PutObject", bucket, backupName+"/"+backupName+".tar.gz", test.backup).Return(test.backupError)
}
if test.log != nil {
objectStore.On("PutObject", bucket, backupName+"/"+backupName+"-logs.gz", test.log).Return(test.logError)
}
if test.expectMetadataDelete {
objectStore.On("DeleteObject", bucket, backupName+"/ark-backup.json").Return(nil)
}
err := UploadBackup(logger, objectStore, bucket, backupName, test.metadata, test.backup, test.log)
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)
} else {
assert.NoError(t, err)
}
})
}
}
func TestDownloadBackup(t *testing.T) {
var (
objectStore = &testutil.ObjectStore{}
bucket = "b"
backup = "bak"
)
objectStore.On("GetObject", bucket, backup+"/"+backup+".tar.gz").Return(ioutil.NopCloser(strings.NewReader("foo")), nil)
rc, err := DownloadBackup(objectStore, bucket, backup)
require.NoError(t, err)
require.NotNil(t, rc)
data, err := ioutil.ReadAll(rc)
require.NoError(t, err)
assert.Equal(t, "foo", string(data))
objectStore.AssertExpectations(t)
}
func TestDeleteBackup(t *testing.T) {
tests := []struct {
name string
listObjectsError error
deleteErrors []error
expectedErr string
}{
{
name: "normal case",
},
{
name: "some delete errors, do as much as we can",
deleteErrors: []error{errors.New("a"), nil, errors.New("c")},
expectedErr: "[a, c]",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
bucket = "bucket"
backup = "bak"
objects = []string{"bak/ark-backup.json", "bak/bak.tar.gz", "bak/bak.log.gz"}
objectStore = &testutil.ObjectStore{}
logger = arktest.NewLogger()
)
objectStore.On("ListObjects", bucket, backup+"/").Return(objects, test.listObjectsError)
for i, obj := range objects {
var err error
if i < len(test.deleteErrors) {
err = test.deleteErrors[i]
}
objectStore.On("DeleteObject", bucket, obj).Return(err)
}
err := DeleteBackupDir(logger, objectStore, bucket, backup)
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)
} else {
assert.NoError(t, err)
}
objectStore.AssertExpectations(t)
})
}
}
func TestGetAllBackups(t *testing.T) {
tests := []struct {
name string
storageData map[string][]byte
expectedRes []*api.Backup
expectedErr string
}{
{
name: "normal case",
storageData: map[string][]byte{
"backup-1/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}}),
"backup-2/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-2"}}),
},
expectedRes: []*api.Backup{
{
TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"},
ObjectMeta: metav1.ObjectMeta{Name: "backup-1"},
},
{
TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"},
ObjectMeta: metav1.ObjectMeta{Name: "backup-2"},
},
},
},
{
name: "backup that can't be decoded is ignored",
storageData: map[string][]byte{
"backup-1/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}}),
"backup-2/ark-backup.json": []byte("this is not valid backup JSON"),
},
expectedRes: []*api.Backup{
{
TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"},
ObjectMeta: metav1.ObjectMeta{Name: "backup-1"},
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
bucket = "bucket"
objectStore = &testutil.ObjectStore{}
logger = arktest.NewLogger()
)
objectStore.On("ListCommonPrefixes", bucket, "/").Return([]string{"backup-1", "backup-2"}, nil)
objectStore.On("GetObject", bucket, "backup-1/ark-backup.json").Return(ioutil.NopCloser(bytes.NewReader(test.storageData["backup-1/ark-backup.json"])), nil)
objectStore.On("GetObject", bucket, "backup-2/ark-backup.json").Return(ioutil.NopCloser(bytes.NewReader(test.storageData["backup-2/ark-backup.json"])), nil)
res, err := ListBackups(logger, objectStore, bucket)
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)
} else {
assert.NoError(t, err)
}
assert.Equal(t, test.expectedRes, res)
objectStore.AssertExpectations(t)
})
}
}
func TestCreateSignedURL(t *testing.T) {
tests := []struct {
name string
targetKind api.DownloadTargetKind
targetName string
directory string
expectedKey string
}{
{
name: "backup contents",
targetKind: api.DownloadTargetKindBackupContents,
targetName: "my-backup",
directory: "my-backup",
expectedKey: "my-backup/my-backup.tar.gz",
},
{
name: "backup log",
targetKind: api.DownloadTargetKindBackupLog,
targetName: "my-backup",
directory: "my-backup",
expectedKey: "my-backup/my-backup-logs.gz",
},
{
name: "scheduled backup contents",
targetKind: api.DownloadTargetKindBackupContents,
targetName: "my-backup-20170913154901",
directory: "my-backup-20170913154901",
expectedKey: "my-backup-20170913154901/my-backup-20170913154901.tar.gz",
},
{
name: "scheduled backup log",
targetKind: api.DownloadTargetKindBackupLog,
targetName: "my-backup-20170913154901",
directory: "my-backup-20170913154901",
expectedKey: "my-backup-20170913154901/my-backup-20170913154901-logs.gz",
},
{
name: "restore log",
targetKind: api.DownloadTargetKindRestoreLog,
targetName: "b-20170913154901",
directory: "b",
expectedKey: "b/restore-b-20170913154901-logs.gz",
},
{
name: "restore results",
targetKind: api.DownloadTargetKindRestoreResults,
targetName: "b-20170913154901",
directory: "b",
expectedKey: "b/restore-b-20170913154901-results.gz",
},
{
name: "restore results - backup has multiple dashes (e.g. restore of scheduled backup)",
targetKind: api.DownloadTargetKindRestoreResults,
targetName: "b-cool-20170913154901-20170913154902",
directory: "b-cool-20170913154901",
expectedKey: "b-cool-20170913154901/restore-b-cool-20170913154901-20170913154902-results.gz",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
objectStore = &testutil.ObjectStore{}
)
defer objectStore.AssertExpectations(t)
target := api.DownloadTarget{
Kind: test.targetKind,
Name: test.targetName,
}
objectStore.On("CreateSignedURL", "bucket", test.expectedKey, time.Duration(0)).Return("url", nil)
url, err := CreateSignedURL(objectStore, target, "bucket", test.directory, 0)
require.NoError(t, err)
assert.Equal(t, "url", url)
})
}
}
func jsonMarshal(obj interface{}) []byte {
res, err := json.Marshal(obj)
if err != nil {
panic(err)
}
return res
}
func encodeToBytes(obj runtime.Object) []byte {
res, err := encode.Encode(obj, "json")
if err != nil {
panic(err)
}
return res
}
type stringReadSeeker struct {
*strings.Reader
}
func newStringReadSeeker(s string) *stringReadSeeker {
return &stringReadSeeker{
Reader: strings.NewReader(s),
}
}
func (srs *stringReadSeeker) Seek(offset int64, whence int) (int64, error) {
return 0, nil
}

View File

@ -0,0 +1,299 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistence
import (
"fmt"
"io"
"io/ioutil"
"strings"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
kerrors "k8s.io/apimachinery/pkg/util/errors"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/generated/clientset/versioned/scheme"
)
// BackupStore defines operations for creating, retrieving, and deleting
// Ark backup and restore data in/from a persistent backup store.
type BackupStore interface {
ListBackups() ([]*arkv1api.Backup, error)
PutBackup(name string, metadata, contents, log io.Reader) error
GetBackupMetadata(name string) (*arkv1api.Backup, error)
GetBackupContents(name string) (io.ReadCloser, error)
DeleteBackup(name string) error
PutRestoreLog(backup, restore string, log io.Reader) error
PutRestoreResults(backup, restore string, results io.Reader) error
GetDownloadURL(backup string, target arkv1api.DownloadTarget) (string, error)
}
const (
// DownloadURLTTL is how long a download URL is valid for.
DownloadURLTTL = 10 * time.Minute
backupMetadataFileFormatString = "%s/ark-backup.json"
backupFileFormatString = "%s/%s.tar.gz"
backupLogFileFormatString = "%s/%s-logs.gz"
restoreLogFileFormatString = "%s/restore-%s-logs.gz"
restoreResultsFileFormatString = "%s/restore-%s-results.gz"
)
func getPrefix(prefix string) string {
if prefix == "" || strings.HasSuffix(prefix, "/") {
return prefix
}
return prefix + "/"
}
func getBackupMetadataKey(prefix, backup string) string {
return prefix + fmt.Sprintf(backupMetadataFileFormatString, backup)
}
func getBackupContentsKey(prefix, backup string) string {
return prefix + fmt.Sprintf(backupFileFormatString, backup, backup)
}
func getBackupLogKey(prefix, backup string) string {
return prefix + fmt.Sprintf(backupLogFileFormatString, backup, backup)
}
func getRestoreLogKey(prefix, backup, restore string) string {
return prefix + fmt.Sprintf(restoreLogFileFormatString, backup, restore)
}
func getRestoreResultsKey(prefix, backup, restore string) string {
return prefix + fmt.Sprintf(restoreResultsFileFormatString, backup, restore)
}
type objectBackupStore struct {
objectStore cloudprovider.ObjectStore
bucket string
prefix string
logger logrus.FieldLogger
}
// ObjectStoreGetter is a type that can get a cloudprovider.ObjectStore
// from a provider name.
type ObjectStoreGetter interface {
GetObjectStore(provider string) (cloudprovider.ObjectStore, error)
}
func NewObjectBackupStore(location *arkv1api.BackupStorageLocation, objectStoreGetter ObjectStoreGetter, logger logrus.FieldLogger) (BackupStore, error) {
if location.Spec.ObjectStorage == nil {
return nil, errors.New("backup storage location does not use object storage")
}
if location.Spec.Provider == "" {
return nil, errors.New("object storage provider name must not be empty")
}
objectStore, err := objectStoreGetter.GetObjectStore(location.Spec.Provider)
if err != nil {
return nil, err
}
// add the bucket name to the config map so that object stores can use
// it when initializing. The AWS object store uses this to determine the
// bucket's region when setting up its client.
if location.Spec.ObjectStorage != nil {
if location.Spec.Config == nil {
location.Spec.Config = make(map[string]string)
}
location.Spec.Config["bucket"] = location.Spec.ObjectStorage.Bucket
}
if err := objectStore.Init(location.Spec.Config); err != nil {
return nil, err
}
prefix := getPrefix(location.Spec.ObjectStorage.Prefix)
log := logger.WithFields(logrus.Fields(map[string]interface{}{
"bucket": location.Spec.ObjectStorage.Bucket,
"prefix": prefix,
}))
return &objectBackupStore{
objectStore: objectStore,
bucket: location.Spec.ObjectStorage.Bucket,
prefix: prefix,
logger: log,
}, nil
}
func (s *objectBackupStore) ListBackups() ([]*arkv1api.Backup, error) {
prefixes, err := s.objectStore.ListCommonPrefixes(s.bucket, s.prefix, "/")
if err != nil {
return nil, err
}
if len(prefixes) == 0 {
return []*arkv1api.Backup{}, nil
}
output := make([]*arkv1api.Backup, 0, len(prefixes))
for _, prefix := range prefixes {
// values returned from a call to cloudprovider.ObjectStore's
// ListcommonPrefixes method return the *full* prefix, inclusive
// of s.prefix, and include the delimiter ("/") as a suffix. Trim
// each of those off to get the backup name.
backupName := strings.TrimSuffix(strings.TrimPrefix(prefix, s.prefix), "/")
backup, err := s.GetBackupMetadata(backupName)
if err != nil {
s.logger.WithError(err).WithField("dir", backupName).Error("Error reading backup directory")
continue
}
output = append(output, backup)
}
return output, nil
}
func (s *objectBackupStore) PutBackup(name string, metadata io.Reader, contents io.Reader, log io.Reader) error {
if err := seekAndPutObject(s.objectStore, s.bucket, getBackupLogKey(s.prefix, name), log); err != nil {
// Uploading the log file is best-effort; if it fails, we log the error but it doesn't impact the
// backup's status.
s.logger.WithError(err).WithField("backup", name).Error("Error uploading log file")
}
if metadata == nil {
// If we don't have metadata, something failed, and there's no point in continuing. An object
// storage bucket that is missing the metadata file can't be restored, nor can its logs be
// viewed.
return nil
}
if err := seekAndPutObject(s.objectStore, s.bucket, getBackupMetadataKey(s.prefix, name), metadata); err != nil {
// failure to upload metadata file is a hard-stop
return err
}
if err := seekAndPutObject(s.objectStore, s.bucket, getBackupContentsKey(s.prefix, name), contents); err != nil {
deleteErr := s.objectStore.DeleteObject(s.bucket, getBackupMetadataKey(s.prefix, name))
return kerrors.NewAggregate([]error{err, deleteErr})
}
return nil
}
func (s *objectBackupStore) GetBackupMetadata(name string) (*arkv1api.Backup, error) {
key := getBackupMetadataKey(s.prefix, name)
res, err := s.objectStore.GetObject(s.bucket, key)
if err != nil {
return nil, err
}
defer res.Close()
data, err := ioutil.ReadAll(res)
if err != nil {
return nil, errors.WithStack(err)
}
decoder := scheme.Codecs.UniversalDecoder(arkv1api.SchemeGroupVersion)
obj, _, err := decoder.Decode(data, nil, nil)
if err != nil {
return nil, errors.WithStack(err)
}
backupObj, ok := obj.(*arkv1api.Backup)
if !ok {
return nil, errors.Errorf("unexpected type for %s/%s: %T", s.bucket, key, obj)
}
return backupObj, nil
}
func (s *objectBackupStore) GetBackupContents(name string) (io.ReadCloser, error) {
return s.objectStore.GetObject(s.bucket, getBackupContentsKey(s.prefix, name))
}
func (s *objectBackupStore) DeleteBackup(name string) error {
objects, err := s.objectStore.ListObjects(s.bucket, s.prefix+name+"/")
if err != nil {
return err
}
var errs []error
for _, key := range objects {
s.logger.WithFields(logrus.Fields{
"key": key,
}).Debug("Trying to delete object")
if err := s.objectStore.DeleteObject(s.bucket, key); err != nil {
errs = append(errs, err)
}
}
return errors.WithStack(kerrors.NewAggregate(errs))
}
func (s *objectBackupStore) PutRestoreLog(backup string, restore string, log io.Reader) error {
return s.objectStore.PutObject(s.bucket, getRestoreLogKey(s.prefix, backup, restore), log)
}
func (s *objectBackupStore) PutRestoreResults(backup string, restore string, results io.Reader) error {
return s.objectStore.PutObject(s.bucket, getRestoreResultsKey(s.prefix, backup, restore), results)
}
func (s *objectBackupStore) GetDownloadURL(backup string, target arkv1api.DownloadTarget) (string, error) {
switch target.Kind {
case arkv1api.DownloadTargetKindBackupContents:
return s.objectStore.CreateSignedURL(s.bucket, getBackupContentsKey(s.prefix, backup), DownloadURLTTL)
case arkv1api.DownloadTargetKindBackupLog:
return s.objectStore.CreateSignedURL(s.bucket, getBackupLogKey(s.prefix, backup), DownloadURLTTL)
case arkv1api.DownloadTargetKindRestoreLog:
return s.objectStore.CreateSignedURL(s.bucket, getRestoreLogKey(s.prefix, backup, target.Name), DownloadURLTTL)
case arkv1api.DownloadTargetKindRestoreResults:
return s.objectStore.CreateSignedURL(s.bucket, getRestoreResultsKey(s.prefix, backup, target.Name), DownloadURLTTL)
default:
return "", errors.Errorf("unsupported download target kind %q", target.Kind)
}
}
func seekToBeginning(r io.Reader) error {
seeker, ok := r.(io.Seeker)
if !ok {
return nil
}
_, err := seeker.Seek(0, 0)
return err
}
func seekAndPutObject(objectStore cloudprovider.ObjectStore, bucket, key string, file io.Reader) error {
if file == nil {
return nil
}
if err := seekToBeginning(file); err != nil {
return errors.WithStack(err)
}
return objectStore.PutObject(bucket, key, file)
}

View File

@ -0,0 +1,403 @@
/*
Copyright 2017 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistence
import (
"bytes"
"errors"
"io"
"io/ioutil"
"sort"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/util/encode"
arktest "github.com/heptio/ark/pkg/util/test"
)
type objectBackupStoreTestHarness struct {
// embedded to reduce verbosity when calling methods
*objectBackupStore
objectStore *cloudprovider.InMemoryObjectStore
bucket, prefix string
}
func newObjectBackupStoreTestHarness(bucket, prefix string) *objectBackupStoreTestHarness {
objectStore := cloudprovider.NewInMemoryObjectStore(bucket)
return &objectBackupStoreTestHarness{
objectBackupStore: &objectBackupStore{
objectStore: objectStore,
bucket: bucket,
prefix: prefix,
logger: arktest.NewLogger(),
},
objectStore: objectStore,
bucket: bucket,
prefix: prefix,
}
}
func TestListBackups(t *testing.T) {
tests := []struct {
name string
prefix string
storageData cloudprovider.BucketData
expectedRes []*api.Backup
expectedErr string
}{
{
name: "normal case",
storageData: map[string][]byte{
"backup-1/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}}),
"backup-2/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-2"}}),
},
expectedRes: []*api.Backup{
{
TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"},
ObjectMeta: metav1.ObjectMeta{Name: "backup-1"},
},
{
TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"},
ObjectMeta: metav1.ObjectMeta{Name: "backup-2"},
},
},
},
{
name: "normal case with backup store prefix",
prefix: "ark-backups/",
storageData: map[string][]byte{
"ark-backups/backup-1/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}}),
"ark-backups/backup-2/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-2"}}),
},
expectedRes: []*api.Backup{
{
TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"},
ObjectMeta: metav1.ObjectMeta{Name: "backup-1"},
},
{
TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"},
ObjectMeta: metav1.ObjectMeta{Name: "backup-2"},
},
},
},
{
name: "backup that can't be decoded is ignored",
storageData: map[string][]byte{
"backup-1/ark-backup.json": encodeToBytes(&api.Backup{ObjectMeta: metav1.ObjectMeta{Name: "backup-1"}}),
"backup-2/ark-backup.json": []byte("this is not valid backup JSON"),
},
expectedRes: []*api.Backup{
{
TypeMeta: metav1.TypeMeta{Kind: "Backup", APIVersion: "ark.heptio.com/v1"},
ObjectMeta: metav1.ObjectMeta{Name: "backup-1"},
},
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
harness := newObjectBackupStoreTestHarness("foo", tc.prefix)
for key, obj := range tc.storageData {
require.NoError(t, harness.objectStore.PutObject(harness.bucket, key, bytes.NewReader(obj)))
}
res, err := harness.ListBackups()
arktest.AssertErrorMatches(t, tc.expectedErr, err)
getComparer := func(obj []*api.Backup) func(i, j int) bool {
return func(i, j int) bool {
switch strings.Compare(obj[i].Namespace, obj[j].Namespace) {
case -1:
return true
case 1:
return false
default:
// namespaces are the same: compare by name
return obj[i].Name < obj[j].Name
}
}
}
sort.Slice(tc.expectedRes, getComparer(tc.expectedRes))
sort.Slice(res, getComparer(res))
assert.Equal(t, tc.expectedRes, res)
})
}
}
func TestPutBackup(t *testing.T) {
tests := []struct {
name string
prefix string
metadata io.Reader
contents io.Reader
log io.Reader
expectedErr string
expectedKeys []string
}{
{
name: "normal case",
metadata: newStringReadSeeker("metadata"),
contents: newStringReadSeeker("contents"),
log: newStringReadSeeker("log"),
expectedErr: "",
expectedKeys: []string{"backup-1/ark-backup.json", "backup-1/backup-1.tar.gz", "backup-1/backup-1-logs.gz"},
},
{
name: "normal case with backup store prefix",
prefix: "prefix-1/",
metadata: newStringReadSeeker("metadata"),
contents: newStringReadSeeker("contents"),
log: newStringReadSeeker("log"),
expectedErr: "",
expectedKeys: []string{"prefix-1/backup-1/ark-backup.json", "prefix-1/backup-1/backup-1.tar.gz", "prefix-1/backup-1/backup-1-logs.gz"},
},
{
name: "error on metadata upload does not upload data",
metadata: new(errorReader),
contents: newStringReadSeeker("contents"),
log: newStringReadSeeker("log"),
expectedErr: "error readers return errors",
expectedKeys: []string{"backup-1/backup-1-logs.gz"},
},
{
name: "error on data upload deletes metadata",
metadata: newStringReadSeeker("metadata"),
contents: new(errorReader),
log: newStringReadSeeker("log"),
expectedErr: "error readers return errors",
expectedKeys: []string{"backup-1/backup-1-logs.gz"},
},
{
name: "error on log upload is ok",
metadata: newStringReadSeeker("foo"),
contents: newStringReadSeeker("bar"),
log: new(errorReader),
expectedErr: "",
expectedKeys: []string{"backup-1/ark-backup.json", "backup-1/backup-1.tar.gz"},
},
{
name: "don't upload data when metadata is nil",
metadata: nil,
contents: newStringReadSeeker("contents"),
log: newStringReadSeeker("log"),
expectedErr: "",
expectedKeys: []string{"backup-1/backup-1-logs.gz"},
},
}
for _, tc := range tests {
harness := newObjectBackupStoreTestHarness("foo", tc.prefix)
err := harness.PutBackup("backup-1", tc.metadata, tc.contents, tc.log)
arktest.AssertErrorMatches(t, tc.expectedErr, err)
assert.Len(t, harness.objectStore.Data[harness.bucket], len(tc.expectedKeys))
for _, key := range tc.expectedKeys {
assert.Contains(t, harness.objectStore.Data[harness.bucket], key)
}
}
}
func TestGetBackupContents(t *testing.T) {
harness := newObjectBackupStoreTestHarness("test-bucket", "")
harness.objectStore.PutObject(harness.bucket, "test-backup/test-backup.tar.gz", newStringReadSeeker("foo"))
rc, err := harness.GetBackupContents("test-backup")
require.NoError(t, err)
require.NotNil(t, rc)
data, err := ioutil.ReadAll(rc)
require.NoError(t, err)
assert.Equal(t, "foo", string(data))
}
func TestDeleteBackup(t *testing.T) {
tests := []struct {
name string
prefix string
listObjectsError error
deleteErrors []error
expectedErr string
}{
{
name: "normal case",
},
{
name: "normal case with backup store prefix",
prefix: "ark-backups/",
},
{
name: "some delete errors, do as much as we can",
deleteErrors: []error{errors.New("a"), nil, errors.New("c")},
expectedErr: "[a, c]",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
objectStore := new(arktest.ObjectStore)
backupStore := &objectBackupStore{
objectStore: objectStore,
bucket: "test-bucket",
prefix: test.prefix,
logger: arktest.NewLogger(),
}
defer objectStore.AssertExpectations(t)
objects := []string{test.prefix + "bak/ark-backup.json", test.prefix + "bak/bak.tar.gz", test.prefix + "bak/bak.log.gz"}
objectStore.On("ListObjects", backupStore.bucket, test.prefix+"bak/").Return(objects, test.listObjectsError)
for i, obj := range objects {
var err error
if i < len(test.deleteErrors) {
err = test.deleteErrors[i]
}
objectStore.On("DeleteObject", backupStore.bucket, obj).Return(err)
}
err := backupStore.DeleteBackup("bak")
arktest.AssertErrorMatches(t, test.expectedErr, err)
})
}
}
func TestGetDownloadURL(t *testing.T) {
tests := []struct {
name string
targetKind api.DownloadTargetKind
targetName string
directory string
prefix string
expectedKey string
}{
{
name: "backup contents",
targetKind: api.DownloadTargetKindBackupContents,
targetName: "my-backup",
directory: "my-backup",
expectedKey: "my-backup/my-backup.tar.gz",
},
{
name: "backup log",
targetKind: api.DownloadTargetKindBackupLog,
targetName: "my-backup",
directory: "my-backup",
expectedKey: "my-backup/my-backup-logs.gz",
},
{
name: "scheduled backup contents",
targetKind: api.DownloadTargetKindBackupContents,
targetName: "my-backup-20170913154901",
directory: "my-backup-20170913154901",
expectedKey: "my-backup-20170913154901/my-backup-20170913154901.tar.gz",
},
{
name: "scheduled backup log",
targetKind: api.DownloadTargetKindBackupLog,
targetName: "my-backup-20170913154901",
directory: "my-backup-20170913154901",
expectedKey: "my-backup-20170913154901/my-backup-20170913154901-logs.gz",
},
{
name: "backup contents with backup store prefix",
targetKind: api.DownloadTargetKindBackupContents,
targetName: "my-backup",
directory: "my-backup",
prefix: "ark-backups/",
expectedKey: "ark-backups/my-backup/my-backup.tar.gz",
},
{
name: "restore log",
targetKind: api.DownloadTargetKindRestoreLog,
targetName: "b-20170913154901",
directory: "b",
expectedKey: "b/restore-b-20170913154901-logs.gz",
},
{
name: "restore results",
targetKind: api.DownloadTargetKindRestoreResults,
targetName: "b-20170913154901",
directory: "b",
expectedKey: "b/restore-b-20170913154901-results.gz",
},
{
name: "restore results - backup has multiple dashes (e.g. restore of scheduled backup)",
targetKind: api.DownloadTargetKindRestoreResults,
targetName: "b-cool-20170913154901-20170913154902",
directory: "b-cool-20170913154901",
expectedKey: "b-cool-20170913154901/restore-b-cool-20170913154901-20170913154902-results.gz",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
harness := newObjectBackupStoreTestHarness("test-bucket", test.prefix)
require.NoError(t, harness.objectStore.PutObject("test-bucket", test.expectedKey, newStringReadSeeker("foo")))
url, err := harness.GetDownloadURL(test.directory, api.DownloadTarget{Kind: test.targetKind, Name: test.targetName})
require.NoError(t, err)
assert.Equal(t, "a-url", url)
})
}
}
func encodeToBytes(obj runtime.Object) []byte {
res, err := encode.Encode(obj, "json")
if err != nil {
panic(err)
}
return res
}
type stringReadSeeker struct {
*strings.Reader
}
func newStringReadSeeker(s string) *stringReadSeeker {
return &stringReadSeeker{
Reader: strings.NewReader(s),
}
}
func (srs *stringReadSeeker) Seek(offset int64, whence int) (int64, error) {
return 0, nil
}
type errorReader struct{}
func (r *errorReader) Read([]byte) (int, error) {
return 0, errors.New("error readers return errors")
}

View File

@ -109,6 +109,7 @@ type ListCommonPrefixesRequest struct {
Plugin string `protobuf:"bytes,1,opt,name=plugin" json:"plugin,omitempty"` Plugin string `protobuf:"bytes,1,opt,name=plugin" json:"plugin,omitempty"`
Bucket string `protobuf:"bytes,2,opt,name=bucket" json:"bucket,omitempty"` Bucket string `protobuf:"bytes,2,opt,name=bucket" json:"bucket,omitempty"`
Delimiter string `protobuf:"bytes,3,opt,name=delimiter" json:"delimiter,omitempty"` Delimiter string `protobuf:"bytes,3,opt,name=delimiter" json:"delimiter,omitempty"`
Prefix string `protobuf:"bytes,4,opt,name=prefix" json:"prefix,omitempty"`
} }
func (m *ListCommonPrefixesRequest) Reset() { *m = ListCommonPrefixesRequest{} } func (m *ListCommonPrefixesRequest) Reset() { *m = ListCommonPrefixesRequest{} }
@ -137,6 +138,13 @@ func (m *ListCommonPrefixesRequest) GetDelimiter() string {
return "" return ""
} }
func (m *ListCommonPrefixesRequest) GetPrefix() string {
if m != nil {
return m.Prefix
}
return ""
}
type ListCommonPrefixesResponse struct { type ListCommonPrefixesResponse struct {
Prefixes []string `protobuf:"bytes,1,rep,name=prefixes" json:"prefixes,omitempty"` Prefixes []string `protobuf:"bytes,1,rep,name=prefixes" json:"prefixes,omitempty"`
} }
@ -637,34 +645,35 @@ var _ObjectStore_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("ObjectStore.proto", fileDescriptor2) } func init() { proto.RegisterFile("ObjectStore.proto", fileDescriptor2) }
var fileDescriptor2 = []byte{ var fileDescriptor2 = []byte{
// 459 bytes of a gzipped FileDescriptorProto // 468 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x51, 0x8b, 0xd3, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x51, 0x8b, 0xd3, 0x40,
0x10, 0x26, 0x26, 0x1e, 0x66, 0xae, 0x60, 0x9c, 0x83, 0x1a, 0x73, 0x2a, 0x75, 0x51, 0xa8, 0x08, 0x10, 0x26, 0x26, 0x1e, 0x66, 0xae, 0x60, 0x9c, 0x83, 0x1a, 0x73, 0x2a, 0x75, 0x51, 0xa8, 0x08,
0xe5, 0xd0, 0x17, 0x1f, 0x7c, 0x10, 0x4f, 0x11, 0xa1, 0xe0, 0x91, 0x2a, 0xfa, 0xe0, 0x4b, 0x7a, 0xe5, 0xd0, 0x17, 0x1f, 0x7c, 0x10, 0x4f, 0x11, 0xa1, 0xe0, 0x91, 0x2a, 0xfa, 0xe0, 0x4b, 0x7a,
0x19, 0x7b, 0xb1, 0x69, 0x12, 0x37, 0x13, 0x30, 0xff, 0xc0, 0x9f, 0x2d, 0xbb, 0x59, 0xeb, 0xa6, 0x19, 0x7b, 0xb1, 0x69, 0x12, 0x37, 0x13, 0x30, 0x8f, 0xbe, 0xf9, 0xb3, 0x65, 0x37, 0x6b, 0x6f,
0xd7, 0xf3, 0xa0, 0xf4, 0x6d, 0xe6, 0xdb, 0xf9, 0x66, 0xbe, 0xec, 0x7e, 0x13, 0xb8, 0xf3, 0x71, 0xd3, 0xeb, 0x79, 0x70, 0xf4, 0x6d, 0x66, 0x76, 0xbe, 0x99, 0x2f, 0xbb, 0xdf, 0x17, 0xb8, 0xf3,
0xfe, 0x83, 0xce, 0x79, 0xc6, 0xa5, 0xa4, 0x49, 0x25, 0x4b, 0x2e, 0xd1, 0x5f, 0x50, 0x41, 0x32, 0x71, 0xfe, 0x83, 0x4e, 0x79, 0xc6, 0xa5, 0xa4, 0x49, 0x25, 0x4b, 0x2e, 0xd1, 0x5f, 0x50, 0x41,
0x61, 0x4a, 0xa3, 0xc1, 0xec, 0x22, 0x91, 0x94, 0x76, 0x07, 0xe2, 0x02, 0x82, 0xb3, 0x86, 0x3b, 0x32, 0x61, 0x4a, 0xa3, 0xc1, 0xec, 0x2c, 0x91, 0x94, 0x76, 0x07, 0xe2, 0x0c, 0x82, 0x93, 0x86,
0x42, 0x4c, 0x3f, 0x1b, 0xaa, 0x19, 0x87, 0x70, 0x50, 0xe5, 0xcd, 0x22, 0x2b, 0x42, 0x67, 0xe4, 0x3b, 0x40, 0x4c, 0x3f, 0x1b, 0xaa, 0x19, 0x87, 0xb0, 0x57, 0xe5, 0xcd, 0x22, 0x2b, 0x42, 0x67,
0x8c, 0xfd, 0xd8, 0x64, 0x0a, 0x9f, 0x37, 0xe7, 0x4b, 0xe2, 0xf0, 0x46, 0x87, 0x77, 0x19, 0x06, 0xe4, 0x8c, 0xfd, 0xd8, 0x64, 0xaa, 0x3e, 0x6f, 0x4e, 0x97, 0xc4, 0xe1, 0x8d, 0xae, 0xde, 0x65,
0xe0, 0x2e, 0xa9, 0x0d, 0x5d, 0x0d, 0xaa, 0x10, 0x11, 0xbc, 0x79, 0x99, 0xb6, 0xa1, 0x37, 0x72, 0x18, 0x80, 0xbb, 0xa4, 0x36, 0x74, 0x75, 0x51, 0x85, 0x88, 0xe0, 0xcd, 0xcb, 0xb4, 0x0d, 0xbd,
0xc6, 0x83, 0x58, 0xc7, 0xe2, 0x13, 0x04, 0xef, 0x69, 0xdf, 0x93, 0xc4, 0x31, 0xdc, 0x7c, 0xd3, 0x91, 0x33, 0x1e, 0xc4, 0x3a, 0x16, 0x9f, 0x20, 0x78, 0x4f, 0xbb, 0xde, 0x24, 0x0e, 0xe1, 0xe6,
0x32, 0xd5, 0x6a, 0x64, 0x9a, 0x70, 0xa2, 0x1b, 0x0d, 0x62, 0x1d, 0x8b, 0x0c, 0xee, 0x4d, 0xb3, 0x9b, 0x96, 0xa9, 0x56, 0x2b, 0xd3, 0x84, 0x13, 0x3d, 0x68, 0x10, 0xeb, 0x58, 0xfc, 0x76, 0xe0,
0x9a, 0x4f, 0xcb, 0xd5, 0xaa, 0x2c, 0xce, 0x24, 0x7d, 0xcf, 0x7e, 0x51, 0xbd, 0xeb, 0xec, 0xfb, 0xde, 0x34, 0xab, 0xf9, 0xb8, 0x5c, 0xad, 0xca, 0xe2, 0x44, 0xd2, 0xf7, 0xec, 0x17, 0xd5, 0xd7,
0xe0, 0xa7, 0x94, 0x67, 0xab, 0x8c, 0x49, 0x1a, 0x05, 0xff, 0x00, 0xf1, 0x12, 0xa2, 0x6d, 0xa3, 0x5d, 0x7e, 0x1f, 0xfc, 0x94, 0xf2, 0x6c, 0x95, 0x31, 0x49, 0x43, 0xe1, 0xbc, 0xa0, 0xa7, 0xe9,
0xea, 0xaa, 0x2c, 0x6a, 0xc2, 0x08, 0x6e, 0x55, 0x06, 0x0b, 0x9d, 0x91, 0x3b, 0xf6, 0xe3, 0x75, 0x05, 0xfa, 0xa3, 0xd5, 0x34, 0x9d, 0x89, 0x97, 0x10, 0x6d, 0xa3, 0x50, 0x57, 0x65, 0x51, 0x13,
0x2e, 0xbe, 0x01, 0x2a, 0x66, 0x77, 0x31, 0x3b, 0xab, 0x53, 0xf5, 0xba, 0xa3, 0x91, 0x66, 0x32, 0x46, 0x70, 0xab, 0x32, 0xb5, 0xd0, 0x19, 0xb9, 0x63, 0x3f, 0x5e, 0xe7, 0xe2, 0x1b, 0xa0, 0x42,
0xf1, 0x14, 0x8e, 0x7a, 0xdd, 0x8d, 0x20, 0x04, 0x6f, 0x49, 0xed, 0x5f, 0x31, 0x3a, 0x16, 0x5f, 0x76, 0x37, 0x76, 0x6d, 0xd6, 0xe7, 0xbc, 0xdc, 0x1e, 0xaf, 0xa7, 0x70, 0xd0, 0x9b, 0x6e, 0x08,
0xe0, 0xe8, 0x2d, 0xe5, 0xc4, 0xb4, 0xef, 0x37, 0xca, 0x61, 0x78, 0x2a, 0x29, 0x61, 0x9a, 0x65, 0x21, 0x78, 0x4b, 0x6a, 0xff, 0x91, 0xd1, 0xb1, 0xf8, 0x02, 0x07, 0x6f, 0x29, 0x27, 0xa6, 0x5d,
0x8b, 0x82, 0xd2, 0xcf, 0xf1, 0x74, 0x7f, 0x4e, 0x0b, 0xc0, 0x65, 0xce, 0xb5, 0xd1, 0xdc, 0x58, 0x3f, 0x5e, 0x0e, 0xc3, 0x63, 0x49, 0x09, 0xd3, 0x2c, 0x5b, 0x14, 0x94, 0x7e, 0x8e, 0xa7, 0xbb,
0x85, 0xe2, 0x19, 0xdc, 0xbd, 0x34, 0xcd, 0x7c, 0x75, 0x00, 0x6e, 0x23, 0x73, 0x33, 0x4b, 0x85, 0x93, 0x60, 0x00, 0x2e, 0x73, 0xae, 0x1f, 0xc3, 0x8d, 0x55, 0x28, 0x9e, 0xc1, 0xdd, 0x0b, 0xdb,
0xcf, 0x7f, 0x7b, 0x70, 0x68, 0x6d, 0x0b, 0x9e, 0x80, 0xf7, 0xa1, 0xc8, 0x18, 0x87, 0x93, 0xf5, 0xcc, 0x57, 0x07, 0xe0, 0x36, 0x32, 0x37, 0xbb, 0x54, 0xf8, 0xfc, 0x8f, 0x07, 0xfb, 0x96, 0x8d,
0xc2, 0x4c, 0x14, 0x60, 0x04, 0x47, 0x81, 0x85, 0xbf, 0x5b, 0x55, 0xdc, 0xe2, 0x2b, 0xf0, 0xd7, 0xf0, 0x08, 0xbc, 0x0f, 0x45, 0xc6, 0x38, 0x9c, 0xac, 0x9d, 0x34, 0x51, 0x05, 0x43, 0x38, 0x0a,
0x0b, 0x84, 0xc7, 0xd6, 0xf1, 0xe6, 0x5a, 0x5d, 0xe6, 0x8e, 0x1d, 0xc5, 0x5e, 0x2f, 0x45, 0x8f, 0xac, 0xfa, 0xbb, 0x55, 0xc5, 0x2d, 0xbe, 0x02, 0x7f, 0xed, 0x2c, 0x3c, 0xb4, 0x8e, 0x37, 0xfd,
0xbd, 0xb9, 0x2a, 0x3d, 0xb6, 0x76, 0xfc, 0x89, 0x83, 0x49, 0x67, 0x9d, 0xbe, 0xe9, 0xf0, 0xb1, 0x76, 0x11, 0x3b, 0x76, 0x14, 0x7a, 0xed, 0x96, 0x1e, 0x7a, 0xd3, 0x43, 0x3d, 0xb4, 0xb6, 0xc2,
0x55, 0x79, 0xa5, 0xfd, 0xa3, 0x27, 0xd7, 0x54, 0x99, 0x2b, 0x9b, 0xc2, 0xa1, 0xe5, 0x1f, 0x7c, 0x91, 0x83, 0x49, 0x27, 0x9d, 0xbe, 0xe8, 0xf0, 0xb1, 0xd5, 0x79, 0xa9, 0x2d, 0xa2, 0x27, 0x57,
0xb0, 0xc1, 0xea, 0xbb, 0x36, 0x7a, 0x78, 0xd5, 0xb1, 0xe9, 0xf6, 0x1a, 0x06, 0xb6, 0xc5, 0xd0, 0x74, 0x99, 0x2b, 0x9b, 0xc2, 0xbe, 0xa5, 0x1f, 0x7c, 0xb0, 0x81, 0xea, 0xab, 0x36, 0x7a, 0x78,
0xae, 0xdf, 0xe2, 0xbd, 0x2d, 0xd7, 0xfd, 0x15, 0x6e, 0x6f, 0xbc, 0x2e, 0x3e, 0xb2, 0x8a, 0xb6, 0xd9, 0xb1, 0x99, 0xf6, 0x1a, 0x06, 0xb6, 0xc4, 0xd0, 0xee, 0xdf, 0xa2, 0xbd, 0x2d, 0xd7, 0xfd,
0xfb, 0x2c, 0x12, 0xff, 0x2b, 0xe9, 0xb4, 0xcd, 0x0f, 0xf4, 0x0f, 0xf1, 0xc5, 0x9f, 0x00, 0x00, 0x15, 0x6e, 0x6f, 0xbc, 0x2e, 0x3e, 0xb2, 0x9a, 0xb6, 0xeb, 0x2c, 0x12, 0xff, 0x6b, 0xe9, 0xb8,
0x00, 0xff, 0xff, 0x59, 0xaf, 0x2a, 0xa0, 0x3e, 0x05, 0x00, 0x00, 0xcd, 0xf7, 0xf4, 0x9f, 0xf2, 0xc5, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x54, 0xac, 0xfe, 0xa7,
0x57, 0x05, 0x00, 0x00,
} }

View File

@ -132,10 +132,17 @@ func (c *ObjectStoreGRPCClient) GetObject(bucket, key string) (io.ReadCloser, er
} }
// ListCommonPrefixes gets a list of all object key prefixes that come // ListCommonPrefixes gets a list of all object key prefixes that come
// before the provided delimiter (this is often used to simulate a directory // after the provided prefix and before the provided delimiter (this is
// hierarchy in object storage). // often used to simulate a directory hierarchy in object storage).
func (c *ObjectStoreGRPCClient) ListCommonPrefixes(bucket, delimiter string) ([]string, error) { func (c *ObjectStoreGRPCClient) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) {
res, err := c.grpcClient.ListCommonPrefixes(context.Background(), &proto.ListCommonPrefixesRequest{Plugin: c.plugin, Bucket: bucket, Delimiter: delimiter}) req := &proto.ListCommonPrefixesRequest{
Plugin: c.plugin,
Bucket: bucket,
Prefix: prefix,
Delimiter: delimiter,
}
res, err := c.grpcClient.ListCommonPrefixes(context.Background(), req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -294,16 +301,16 @@ func (s *ObjectStoreGRPCServer) GetObject(req *proto.GetObjectRequest, stream pr
} }
} }
// ListCommonPrefixes gets a list of all object key prefixes that come // ListCommonPrefixes gets a list of all object key prefixes that start with
// before the provided delimiter (this is often used to simulate a directory // the specified prefix and stop at the next instance of the provided delimiter
// hierarchy in object storage). // (this is often used to simulate a directory hierarchy in object storage).
func (s *ObjectStoreGRPCServer) ListCommonPrefixes(ctx context.Context, req *proto.ListCommonPrefixesRequest) (*proto.ListCommonPrefixesResponse, error) { func (s *ObjectStoreGRPCServer) ListCommonPrefixes(ctx context.Context, req *proto.ListCommonPrefixesRequest) (*proto.ListCommonPrefixesResponse, error) {
impl, err := s.getImpl(req.Plugin) impl, err := s.getImpl(req.Plugin)
if err != nil { if err != nil {
return nil, err return nil, err
} }
prefixes, err := impl.ListCommonPrefixes(req.Bucket, req.Delimiter) prefixes, err := impl.ListCommonPrefixes(req.Bucket, req.Prefix, req.Delimiter)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -24,6 +24,7 @@ message ListCommonPrefixesRequest {
string plugin = 1; string plugin = 1;
string bucket = 2; string bucket = 2;
string delimiter = 3; string delimiter = 3;
string prefix = 4;
} }
message ListCommonPrefixesResponse { message ListCommonPrefixesResponse {

View File

@ -127,12 +127,12 @@ func (r *restartableObjectStore) GetObject(bucket string, key string) (io.ReadCl
} }
// ListCommonPrefixes restarts the plugin's process if needed, then delegates the call. // ListCommonPrefixes restarts the plugin's process if needed, then delegates the call.
func (r *restartableObjectStore) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) { func (r *restartableObjectStore) ListCommonPrefixes(bucket string, prefix string, delimiter string) ([]string, error) {
delegate, err := r.getDelegate() delegate, err := r.getDelegate()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return delegate.ListCommonPrefixes(bucket, delimiter) return delegate.ListCommonPrefixes(bucket, prefix, delimiter)
} }
// ListObjects restarts the plugin's process if needed, then delegates the call. // ListObjects restarts the plugin's process if needed, then delegates the call.

View File

@ -208,7 +208,7 @@ func TestRestartableObjectStoreDelegatedFunctions(t *testing.T) {
}, },
restartableDelegateTest{ restartableDelegateTest{
function: "ListCommonPrefixes", function: "ListCommonPrefixes",
inputs: []interface{}{"bucket", "delimeter"}, inputs: []interface{}{"bucket", "prefix", "delimiter"},
expectedErrorOutputs: []interface{}{([]string)(nil), errors.Errorf("reset error")}, expectedErrorOutputs: []interface{}{([]string)(nil), errors.Errorf("reset error")},
expectedDelegateOutputs: []interface{}{[]string{"a", "b"}, errors.Errorf("delegate error")}, expectedDelegateOutputs: []interface{}{[]string{"a", "b"}, errors.Errorf("delegate error")},
}, },

View File

@ -110,3 +110,13 @@ func AssertDeepEqual(t *testing.T, expected, actual interface{}) bool {
return true return true
} }
// AssertErrorMatches asserts that if expected is the empty string, actual
// is nil, otherwise, that actual's error string matches expected.
func AssertErrorMatches(t *testing.T, expected string, actual error) bool {
if expected != "" {
return assert.EqualError(t, actual, expected)
}
return assert.NoError(t, actual)
}

View File

@ -98,13 +98,13 @@ func (_m *ObjectStore) Init(config map[string]string) error {
return r0 return r0
} }
// ListCommonPrefixes provides a mock function with given fields: bucket, delimiter // ListCommonPrefixes provides a mock function with given fields: bucket, prefix, delimiter
func (_m *ObjectStore) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) { func (_m *ObjectStore) ListCommonPrefixes(bucket string, prefix string, delimiter string) ([]string, error) {
ret := _m.Called(bucket, delimiter) ret := _m.Called(bucket, prefix, delimiter)
var r0 []string var r0 []string
if rf, ok := ret.Get(0).(func(string, string) []string); ok { if rf, ok := ret.Get(0).(func(string, string, string) []string); ok {
r0 = rf(bucket, delimiter) r0 = rf(bucket, prefix, delimiter)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).([]string) r0 = ret.Get(0).([]string)
@ -112,8 +112,8 @@ func (_m *ObjectStore) ListCommonPrefixes(bucket string, delimiter string) ([]st
} }
var r1 error var r1 error
if rf, ok := ret.Get(1).(func(string, string) error); ok { if rf, ok := ret.Get(1).(func(string, string, string) error); ok {
r1 = rf(bucket, delimiter) r1 = rf(bucket, prefix, delimiter)
} else { } else {
r1 = ret.Error(1) r1 = ret.Error(1)
} }