switch UploadBackup, UploadRestoreLog, PutObject to take io.Reader
Signed-off-by: Steve Kriss <steve@heptio.com>pull/174/head
parent
f28d008017
commit
e7703d88ec
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/endpoints"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/heptio/ark/pkg/cloudprovider"
|
||||
|
@ -31,8 +32,9 @@ import (
|
|||
var _ cloudprovider.ObjectStorageAdapter = &objectStorageAdapter{}
|
||||
|
||||
type objectStorageAdapter struct {
|
||||
s3 *s3.S3
|
||||
kmsKeyID string
|
||||
s3 *s3.S3
|
||||
s3Uploader *s3manager.Uploader
|
||||
kmsKeyID string
|
||||
}
|
||||
|
||||
func NewObjectStorageAdapter(region, s3URL, kmsKeyID string, s3ForcePathStyle bool) (cloudprovider.ObjectStorageAdapter, error) {
|
||||
|
@ -64,13 +66,14 @@ func NewObjectStorageAdapter(region, s3URL, kmsKeyID string, s3ForcePathStyle bo
|
|||
}
|
||||
|
||||
return &objectStorageAdapter{
|
||||
s3: s3.New(sess),
|
||||
kmsKeyID: kmsKeyID,
|
||||
s3: s3.New(sess),
|
||||
s3Uploader: s3manager.NewUploader(sess),
|
||||
kmsKeyID: kmsKeyID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (op *objectStorageAdapter) PutObject(bucket string, key string, body io.ReadSeeker) error {
|
||||
req := &s3.PutObjectInput{
|
||||
func (op *objectStorageAdapter) PutObject(bucket string, key string, body io.Reader) error {
|
||||
req := &s3manager.UploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: &key,
|
||||
Body: body,
|
||||
|
@ -82,7 +85,7 @@ func (op *objectStorageAdapter) PutObject(bucket string, key string, body io.Rea
|
|||
req.SSEKMSKeyId = &op.kmsKeyID
|
||||
}
|
||||
|
||||
_, err := op.s3.PutObject(req)
|
||||
_, err := op.s3Uploader.Upload(req)
|
||||
|
||||
return errors.Wrapf(err, "error putting object %s", key)
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ func NewObjectStorageAdapter() (cloudprovider.ObjectStorageAdapter, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (op *objectStorageAdapter) PutObject(bucket string, key string, body io.ReadSeeker) error {
|
||||
func (op *objectStorageAdapter) PutObject(bucket string, key string, body io.Reader) error {
|
||||
container, err := getContainerReference(op.blobClient, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -61,19 +61,6 @@ func (op *objectStorageAdapter) PutObject(bucket string, key string, body io.Rea
|
|||
return err
|
||||
}
|
||||
|
||||
// TODO having to seek to end/back to beginning to get
|
||||
// length here is ugly. refactor to make this better.
|
||||
len, err := body.Seek(0, io.SeekEnd)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
blob.Properties.ContentLength = len
|
||||
|
||||
if _, err := body.Seek(0, 0); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return errors.WithStack(blob.CreateBlockBlobFromReader(body, nil))
|
||||
}
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ type BackupService interface {
|
|||
// UploadBackup uploads the specified Ark backup of a set of Kubernetes API objects, whose manifests are
|
||||
// stored in the specified file, into object storage in an Ark bucket, tagged with Ark metadata. Returns
|
||||
// an error if a problem is encountered accessing the file or performing the upload via the cloud API.
|
||||
UploadBackup(bucket, name string, metadata, backup, log io.ReadSeeker) error
|
||||
UploadBackup(bucket, name string, metadata, backup, log io.Reader) error
|
||||
|
||||
// DownloadBackup downloads an Ark backup with the specified object key from object storage via the cloud API.
|
||||
// It returns the snapshot metadata and data (separately), or an error if a problem is encountered
|
||||
|
@ -58,7 +58,7 @@ type BackupService interface {
|
|||
CreateSignedURL(target api.DownloadTarget, bucket string, ttl time.Duration) (string, error)
|
||||
|
||||
// UploadRestoreLog uploads the restore's log file to object storage.
|
||||
UploadRestoreLog(bucket, backup, restore string, log io.ReadSeeker) error
|
||||
UploadRestoreLog(bucket, backup, restore string, log io.Reader) error
|
||||
}
|
||||
|
||||
// BackupGetter knows how to list backups in object storage.
|
||||
|
@ -108,7 +108,7 @@ func NewBackupService(objectStorage ObjectStorageAdapter, logger *logrus.Logger)
|
|||
}
|
||||
}
|
||||
|
||||
func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.ReadSeeker) error {
|
||||
func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.Reader) error {
|
||||
// upload metadata file
|
||||
metadataKey := getMetadataKey(backupName)
|
||||
if err := br.objectStorage.PutObject(bucket, metadataKey, metadata); err != nil {
|
||||
|
@ -231,7 +231,7 @@ func (br *backupService) CreateSignedURL(target api.DownloadTarget, bucket strin
|
|||
}
|
||||
}
|
||||
|
||||
func (br *backupService) UploadRestoreLog(bucket, backup, restore string, log io.ReadSeeker) error {
|
||||
func (br *backupService) UploadRestoreLog(bucket, backup, restore string, log io.Reader) error {
|
||||
key := getRestoreLogKey(backup, restore)
|
||||
return br.objectStorage.PutObject(bucket, key, log)
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ func NewObjectStorageAdapter(googleAccessID string, privateKey []byte) (cloudpro
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (op *objectStorageAdapter) PutObject(bucket string, key string, body io.ReadSeeker) error {
|
||||
func (op *objectStorageAdapter) PutObject(bucket string, key string, body io.Reader) error {
|
||||
obj := &storage.Object{
|
||||
Name: key,
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import (
|
|||
type ObjectStorageAdapter interface {
|
||||
// PutObject creates a new object using the data in body within the specified
|
||||
// object storage bucket with the given key.
|
||||
PutObject(bucket string, key string, body io.ReadSeeker) error
|
||||
PutObject(bucket string, key string, body io.Reader) error
|
||||
|
||||
// GetObject retrieves the object with the given key from the specified
|
||||
// bucket in object storage.
|
||||
|
|
|
@ -338,5 +338,5 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string)
|
|||
return errors.Wrap(err, "error resetting Backup log file offset")
|
||||
}
|
||||
|
||||
return controller.backupService.UploadBackup(bucket, backup.Name, bytes.NewReader(buf.Bytes()), backupFile, logFile)
|
||||
return controller.backupService.UploadBackup(bucket, backup.Name, buf, backupFile, logFile)
|
||||
}
|
||||
|
|
|
@ -132,11 +132,11 @@ func (_m *BackupService) GetBackup(bucket string, name string) (*v1.Backup, erro
|
|||
}
|
||||
|
||||
// UploadBackup provides a mock function with given fields: bucket, name, metadata, backup, log
|
||||
func (_m *BackupService) UploadBackup(bucket string, name string, metadata io.ReadSeeker, backup io.ReadSeeker, log io.ReadSeeker) error {
|
||||
func (_m *BackupService) UploadBackup(bucket string, name string, metadata, backup, log io.Reader) error {
|
||||
ret := _m.Called(bucket, name, metadata, backup, log)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(string, string, io.ReadSeeker, io.ReadSeeker, io.ReadSeeker) error); ok {
|
||||
if rf, ok := ret.Get(0).(func(string, string, io.Reader, io.Reader, io.Reader) error); ok {
|
||||
r0 = rf(bucket, name, metadata, backup, log)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
|
@ -146,11 +146,11 @@ func (_m *BackupService) UploadBackup(bucket string, name string, metadata io.Re
|
|||
}
|
||||
|
||||
// UploadRestoreLog provides a mock function with given fields: bucket, backup, restore, log
|
||||
func (_m *BackupService) UploadRestoreLog(bucket string, backup string, restore string, log io.ReadSeeker) error {
|
||||
func (_m *BackupService) UploadRestoreLog(bucket string, backup string, restore string, log io.Reader) error {
|
||||
ret := _m.Called(bucket, backup, restore, log)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(string, string, string, io.ReadSeeker) error); ok {
|
||||
if rf, ok := ret.Get(0).(func(string, string, string, io.Reader) error); ok {
|
||||
r0 = rf(bucket, backup, restore, log)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
|
|
|
@ -41,7 +41,7 @@ func (f *FakeBackupService) GetAllBackups(bucket string) ([]*v1.Backup, error) {
|
|||
return backups, args.Error(1)
|
||||
}
|
||||
|
||||
func (f *FakeBackupService) UploadBackup(bucket, name string, metadata, backup io.ReadSeeker) error {
|
||||
func (f *FakeBackupService) UploadBackup(bucket, name string, metadata, backup io.Reader) error {
|
||||
args := f.Called(bucket, name, metadata, backup)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
|
|
@ -131,11 +131,11 @@ func (_m *ObjectStorageAdapter) ListObjects(bucket string, prefix string) ([]str
|
|||
}
|
||||
|
||||
// PutObject provides a mock function with given fields: bucket, key, body
|
||||
func (_m *ObjectStorageAdapter) PutObject(bucket string, key string, body io.ReadSeeker) error {
|
||||
func (_m *ObjectStorageAdapter) PutObject(bucket string, key string, body io.Reader) error {
|
||||
ret := _m.Called(bucket, key, body)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(string, string, io.ReadSeeker) error); ok {
|
||||
if rf, ok := ret.Get(0).(func(string, string, io.Reader) error); ok {
|
||||
r0 = rf(bucket, key, body)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
|
|
Loading…
Reference in New Issue