Merge pull request #7452 from Lyndon-Li/issue-fix-7211

Issue 7211: support concatenate objects
pull/7505/head
lyndon-li 2024-03-04 10:34:37 +08:00 committed by GitHub
commit 97d276caa7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 404 additions and 39 deletions

View File

@ -0,0 +1 @@
Fix issue #7211. Enable advanced feature capability and add support to concatenate objects for unified repo.

View File

@ -366,6 +366,39 @@ func (kr *kopiaRepository) Flush(ctx context.Context) error {
return nil
}
func (kr *kopiaRepository) GetAdvancedFeatures() udmrepo.AdvancedFeatureInfo {
return udmrepo.AdvancedFeatureInfo{
MultiPartBackup: true,
}
}
func (kr *kopiaRepository) ConcatenateObjects(ctx context.Context, objectIDs []udmrepo.ID) (udmrepo.ID, error) {
if kr.rawWriter == nil {
return "", errors.New("repo writer is closed or not open")
}
if len(objectIDs) == 0 {
return udmrepo.ID(""), errors.New("object list is empty")
}
rawIDs := []object.ID{}
for _, id := range objectIDs {
rawID, err := object.ParseID(string(id))
if err != nil {
return udmrepo.ID(""), errors.Wrapf(err, "error to parse object ID from %v", id)
}
rawIDs = append(rawIDs, rawID)
}
result, err := kr.rawWriter.ConcatenateObjects(ctx, rawIDs)
if err != nil {
return udmrepo.ID(""), errors.Wrap(err, "error to concatenate objects")
}
return udmrepo.ID(result.String()), nil
}
// updateProgress is called when the repository writes a piece of blob data to the storage during data write
func (kr *kopiaRepository) updateProgress(uploaded int64) {
total := atomic.AddInt64(&kr.uploaded, uploaded)

View File

@ -716,6 +716,78 @@ func TestFlush(t *testing.T) {
}
}
func TestConcatenateObjects(t *testing.T) {
testCases := []struct {
name string
setWriter bool
rawWriter *repomocks.DirectRepositoryWriter
rawWriterRetErr error
objectIDs []udmrepo.ID
expectedErr string
}{
{
name: "writer is nil",
expectedErr: "repo writer is closed or not open",
},
{
name: "empty object list",
setWriter: true,
expectedErr: "object list is empty",
},
{
name: "invalid object id",
objectIDs: []udmrepo.ID{
"I123456",
"fake-id",
"I678901",
},
setWriter: true,
expectedErr: "error to parse object ID from fake-id: malformed content ID: \"fake-id\": invalid content prefix",
},
{
name: "concatenate error",
rawWriter: repomocks.NewDirectRepositoryWriter(t),
rawWriterRetErr: errors.New("fake-concatenate-error"),
objectIDs: []udmrepo.ID{
"I123456",
},
setWriter: true,
expectedErr: "error to concatenate objects: fake-concatenate-error",
},
{
name: "succeed",
rawWriter: repomocks.NewDirectRepositoryWriter(t),
objectIDs: []udmrepo.ID{
"I123456",
},
setWriter: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
kr := &kopiaRepository{}
if tc.rawWriter != nil {
require.NotNil(t, tc.rawWriter)
tc.rawWriter.On("ConcatenateObjects", mock.Anything, mock.Anything).Return(object.ID{}, tc.rawWriterRetErr)
}
if tc.setWriter {
kr.rawWriter = tc.rawWriter
}
_, err := kr.ConcatenateObjects(context.Background(), tc.objectIDs)
if tc.expectedErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, tc.expectedErr)
}
})
}
}
func TestNewObjectWriter(t *testing.T) {
rawObjWriter := repomocks.NewWriter(t)
testCases := []struct {

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.39.1. DO NOT EDIT.
package mocks
@ -20,6 +20,10 @@ type BackupRepo struct {
func (_m *BackupRepo) Close(ctx context.Context) error {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for Close")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
@ -30,10 +34,42 @@ func (_m *BackupRepo) Close(ctx context.Context) error {
return r0
}
// ConcatenateObjects provides a mock function with given fields: ctx, objectIDs
func (_m *BackupRepo) ConcatenateObjects(ctx context.Context, objectIDs []udmrepo.ID) (udmrepo.ID, error) {
ret := _m.Called(ctx, objectIDs)
if len(ret) == 0 {
panic("no return value specified for ConcatenateObjects")
}
var r0 udmrepo.ID
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, []udmrepo.ID) (udmrepo.ID, error)); ok {
return rf(ctx, objectIDs)
}
if rf, ok := ret.Get(0).(func(context.Context, []udmrepo.ID) udmrepo.ID); ok {
r0 = rf(ctx, objectIDs)
} else {
r0 = ret.Get(0).(udmrepo.ID)
}
if rf, ok := ret.Get(1).(func(context.Context, []udmrepo.ID) error); ok {
r1 = rf(ctx, objectIDs)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DeleteManifest provides a mock function with given fields: ctx, id
func (_m *BackupRepo) DeleteManifest(ctx context.Context, id udmrepo.ID) error {
ret := _m.Called(ctx, id)
if len(ret) == 0 {
panic("no return value specified for DeleteManifest")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, udmrepo.ID) error); ok {
r0 = rf(ctx, id)
@ -48,7 +84,15 @@ func (_m *BackupRepo) DeleteManifest(ctx context.Context, id udmrepo.ID) error {
func (_m *BackupRepo) FindManifests(ctx context.Context, filter udmrepo.ManifestFilter) ([]*udmrepo.ManifestEntryMetadata, error) {
ret := _m.Called(ctx, filter)
if len(ret) == 0 {
panic("no return value specified for FindManifests")
}
var r0 []*udmrepo.ManifestEntryMetadata
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, udmrepo.ManifestFilter) ([]*udmrepo.ManifestEntryMetadata, error)); ok {
return rf(ctx, filter)
}
if rf, ok := ret.Get(0).(func(context.Context, udmrepo.ManifestFilter) []*udmrepo.ManifestEntryMetadata); ok {
r0 = rf(ctx, filter)
} else {
@ -57,7 +101,6 @@ func (_m *BackupRepo) FindManifests(ctx context.Context, filter udmrepo.Manifest
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, udmrepo.ManifestFilter) error); ok {
r1 = rf(ctx, filter)
} else {
@ -71,6 +114,10 @@ func (_m *BackupRepo) FindManifests(ctx context.Context, filter udmrepo.Manifest
func (_m *BackupRepo) Flush(ctx context.Context) error {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for Flush")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
@ -81,10 +128,32 @@ func (_m *BackupRepo) Flush(ctx context.Context) error {
return r0
}
// GetAdvancedFeatures provides a mock function with given fields:
func (_m *BackupRepo) GetAdvancedFeatures() udmrepo.AdvancedFeatureInfo {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetAdvancedFeatures")
}
var r0 udmrepo.AdvancedFeatureInfo
if rf, ok := ret.Get(0).(func() udmrepo.AdvancedFeatureInfo); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(udmrepo.AdvancedFeatureInfo)
}
return r0
}
// GetManifest provides a mock function with given fields: ctx, id, mani
func (_m *BackupRepo) GetManifest(ctx context.Context, id udmrepo.ID, mani *udmrepo.RepoManifest) error {
ret := _m.Called(ctx, id, mani)
if len(ret) == 0 {
panic("no return value specified for GetManifest")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, udmrepo.ID, *udmrepo.RepoManifest) error); ok {
r0 = rf(ctx, id, mani)
@ -99,6 +168,10 @@ func (_m *BackupRepo) GetManifest(ctx context.Context, id udmrepo.ID, mani *udmr
func (_m *BackupRepo) NewObjectWriter(ctx context.Context, opt udmrepo.ObjectWriteOptions) udmrepo.ObjectWriter {
ret := _m.Called(ctx, opt)
if len(ret) == 0 {
panic("no return value specified for NewObjectWriter")
}
var r0 udmrepo.ObjectWriter
if rf, ok := ret.Get(0).(func(context.Context, udmrepo.ObjectWriteOptions) udmrepo.ObjectWriter); ok {
r0 = rf(ctx, opt)
@ -115,7 +188,15 @@ func (_m *BackupRepo) NewObjectWriter(ctx context.Context, opt udmrepo.ObjectWri
func (_m *BackupRepo) OpenObject(ctx context.Context, id udmrepo.ID) (udmrepo.ObjectReader, error) {
ret := _m.Called(ctx, id)
if len(ret) == 0 {
panic("no return value specified for OpenObject")
}
var r0 udmrepo.ObjectReader
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, udmrepo.ID) (udmrepo.ObjectReader, error)); ok {
return rf(ctx, id)
}
if rf, ok := ret.Get(0).(func(context.Context, udmrepo.ID) udmrepo.ObjectReader); ok {
r0 = rf(ctx, id)
} else {
@ -124,7 +205,6 @@ func (_m *BackupRepo) OpenObject(ctx context.Context, id udmrepo.ID) (udmrepo.Ob
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, udmrepo.ID) error); ok {
r1 = rf(ctx, id)
} else {
@ -138,14 +218,21 @@ func (_m *BackupRepo) OpenObject(ctx context.Context, id udmrepo.ID) (udmrepo.Ob
func (_m *BackupRepo) PutManifest(ctx context.Context, mani udmrepo.RepoManifest) (udmrepo.ID, error) {
ret := _m.Called(ctx, mani)
if len(ret) == 0 {
panic("no return value specified for PutManifest")
}
var r0 udmrepo.ID
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, udmrepo.RepoManifest) (udmrepo.ID, error)); ok {
return rf(ctx, mani)
}
if rf, ok := ret.Get(0).(func(context.Context, udmrepo.RepoManifest) udmrepo.ID); ok {
r0 = rf(ctx, mani)
} else {
r0 = ret.Get(0).(udmrepo.ID)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, udmrepo.RepoManifest) error); ok {
r1 = rf(ctx, mani)
} else {
@ -159,6 +246,10 @@ func (_m *BackupRepo) PutManifest(ctx context.Context, mani udmrepo.RepoManifest
func (_m *BackupRepo) Time() time.Time {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Time")
}
var r0 time.Time
if rf, ok := ret.Get(0).(func() time.Time); ok {
r0 = rf()
@ -169,13 +260,12 @@ func (_m *BackupRepo) Time() time.Time {
return r0
}
type mockConstructorTestingTNewBackupRepo interface {
// NewBackupRepo creates a new instance of BackupRepo. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewBackupRepo(t interface {
mock.TestingT
Cleanup(func())
}
// NewBackupRepo creates a new instance of BackupRepo. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewBackupRepo(t mockConstructorTestingTNewBackupRepo) *BackupRepo {
}) *BackupRepo {
mock := &BackupRepo{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.39.1. DO NOT EDIT.
package mocks
@ -20,6 +20,10 @@ type BackupRepoService struct {
func (_m *BackupRepoService) DefaultMaintenanceFrequency() time.Duration {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for DefaultMaintenanceFrequency")
}
var r0 time.Duration
if rf, ok := ret.Get(0).(func() time.Duration); ok {
r0 = rf()
@ -34,6 +38,10 @@ func (_m *BackupRepoService) DefaultMaintenanceFrequency() time.Duration {
func (_m *BackupRepoService) Init(ctx context.Context, repoOption udmrepo.RepoOptions, createNew bool) error {
ret := _m.Called(ctx, repoOption, createNew)
if len(ret) == 0 {
panic("no return value specified for Init")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, udmrepo.RepoOptions, bool) error); ok {
r0 = rf(ctx, repoOption, createNew)
@ -48,6 +56,10 @@ func (_m *BackupRepoService) Init(ctx context.Context, repoOption udmrepo.RepoOp
func (_m *BackupRepoService) Maintain(ctx context.Context, repoOption udmrepo.RepoOptions) error {
ret := _m.Called(ctx, repoOption)
if len(ret) == 0 {
panic("no return value specified for Maintain")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, udmrepo.RepoOptions) error); ok {
r0 = rf(ctx, repoOption)
@ -62,7 +74,15 @@ func (_m *BackupRepoService) Maintain(ctx context.Context, repoOption udmrepo.Re
func (_m *BackupRepoService) Open(ctx context.Context, repoOption udmrepo.RepoOptions) (udmrepo.BackupRepo, error) {
ret := _m.Called(ctx, repoOption)
if len(ret) == 0 {
panic("no return value specified for Open")
}
var r0 udmrepo.BackupRepo
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, udmrepo.RepoOptions) (udmrepo.BackupRepo, error)); ok {
return rf(ctx, repoOption)
}
if rf, ok := ret.Get(0).(func(context.Context, udmrepo.RepoOptions) udmrepo.BackupRepo); ok {
r0 = rf(ctx, repoOption)
} else {
@ -71,7 +91,6 @@ func (_m *BackupRepoService) Open(ctx context.Context, repoOption udmrepo.RepoOp
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, udmrepo.RepoOptions) error); ok {
r1 = rf(ctx, repoOption)
} else {
@ -81,13 +100,12 @@ func (_m *BackupRepoService) Open(ctx context.Context, repoOption udmrepo.RepoOp
return r0, r1
}
type mockConstructorTestingTNewBackupRepoService interface {
// NewBackupRepoService creates a new instance of BackupRepoService. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewBackupRepoService(t interface {
mock.TestingT
Cleanup(func())
}
// NewBackupRepoService creates a new instance of BackupRepoService. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewBackupRepoService(t mockConstructorTestingTNewBackupRepoService) *BackupRepoService {
}) *BackupRepoService {
mock := &BackupRepoService{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.39.1. DO NOT EDIT.
package mocks
@ -13,6 +13,10 @@ type ObjectReader struct {
func (_m *ObjectReader) Close() error {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Close")
}
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
@ -27,6 +31,10 @@ func (_m *ObjectReader) Close() error {
func (_m *ObjectReader) Length() int64 {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Length")
}
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
@ -41,14 +49,21 @@ func (_m *ObjectReader) Length() int64 {
func (_m *ObjectReader) Read(p []byte) (int, error) {
ret := _m.Called(p)
if len(ret) == 0 {
panic("no return value specified for Read")
}
var r0 int
var r1 error
if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok {
return rf(p)
}
if rf, ok := ret.Get(0).(func([]byte) int); ok {
r0 = rf(p)
} else {
r0 = ret.Get(0).(int)
}
var r1 error
if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(p)
} else {
@ -62,14 +77,21 @@ func (_m *ObjectReader) Read(p []byte) (int, error) {
func (_m *ObjectReader) Seek(offset int64, whence int) (int64, error) {
ret := _m.Called(offset, whence)
if len(ret) == 0 {
panic("no return value specified for Seek")
}
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(int64, int) (int64, error)); ok {
return rf(offset, whence)
}
if rf, ok := ret.Get(0).(func(int64, int) int64); ok {
r0 = rf(offset, whence)
} else {
r0 = ret.Get(0).(int64)
}
var r1 error
if rf, ok := ret.Get(1).(func(int64, int) error); ok {
r1 = rf(offset, whence)
} else {
@ -79,13 +101,12 @@ func (_m *ObjectReader) Seek(offset int64, whence int) (int64, error) {
return r0, r1
}
type mockConstructorTestingTNewObjectReader interface {
// NewObjectReader creates a new instance of ObjectReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewObjectReader(t interface {
mock.TestingT
Cleanup(func())
}
// NewObjectReader creates a new instance of ObjectReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewObjectReader(t mockConstructorTestingTNewObjectReader) *ObjectReader {
}) *ObjectReader {
mock := &ObjectReader{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.39.1. DO NOT EDIT.
package mocks
@ -16,14 +16,21 @@ type ObjectWriter struct {
func (_m *ObjectWriter) Checkpoint() (udmrepo.ID, error) {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Checkpoint")
}
var r0 udmrepo.ID
var r1 error
if rf, ok := ret.Get(0).(func() (udmrepo.ID, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() udmrepo.ID); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(udmrepo.ID)
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
@ -37,6 +44,10 @@ func (_m *ObjectWriter) Checkpoint() (udmrepo.ID, error) {
func (_m *ObjectWriter) Close() error {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Close")
}
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
@ -51,14 +62,21 @@ func (_m *ObjectWriter) Close() error {
func (_m *ObjectWriter) Result() (udmrepo.ID, error) {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Result")
}
var r0 udmrepo.ID
var r1 error
if rf, ok := ret.Get(0).(func() (udmrepo.ID, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() udmrepo.ID); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(udmrepo.ID)
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
@ -72,14 +90,21 @@ func (_m *ObjectWriter) Result() (udmrepo.ID, error) {
func (_m *ObjectWriter) Seek(offset int64, whence int) (int64, error) {
ret := _m.Called(offset, whence)
if len(ret) == 0 {
panic("no return value specified for Seek")
}
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(int64, int) (int64, error)); ok {
return rf(offset, whence)
}
if rf, ok := ret.Get(0).(func(int64, int) int64); ok {
r0 = rf(offset, whence)
} else {
r0 = ret.Get(0).(int64)
}
var r1 error
if rf, ok := ret.Get(1).(func(int64, int) error); ok {
r1 = rf(offset, whence)
} else {
@ -93,14 +118,21 @@ func (_m *ObjectWriter) Seek(offset int64, whence int) (int64, error) {
func (_m *ObjectWriter) Write(p []byte) (int, error) {
ret := _m.Called(p)
if len(ret) == 0 {
panic("no return value specified for Write")
}
var r0 int
var r1 error
if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok {
return rf(p)
}
if rf, ok := ret.Get(0).(func([]byte) int); ok {
r0 = rf(p)
} else {
r0 = ret.Get(0).(int)
}
var r1 error
if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(p)
} else {
@ -110,13 +142,12 @@ func (_m *ObjectWriter) Write(p []byte) (int, error) {
return r0, r1
}
type mockConstructorTestingTNewObjectWriter interface {
// NewObjectWriter creates a new instance of ObjectWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewObjectWriter(t interface {
mock.TestingT
Cleanup(func())
}
// NewObjectWriter creates a new instance of ObjectWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewObjectWriter(t mockConstructorTestingTNewObjectWriter) *ObjectWriter {
}) *ObjectWriter {
mock := &ObjectWriter{}
mock.Mock.Test(t)

View File

@ -71,6 +71,10 @@ type ObjectWriteOptions struct {
AsyncWrites int // Num of async writes for the object, 0 means no async write
}
type AdvancedFeatureInfo struct {
MultiPartBackup bool // if set to true, it means the repo supports multiple-part backup
}
// BackupRepoService is used to initialize, open or maintain a backup repository
type BackupRepoService interface {
// Init creates a backup repository or connect to an existing backup repository.
@ -116,6 +120,12 @@ type BackupRepo interface {
// Flush flushes all the backup repository data
Flush(ctx context.Context) error
// GetAdvancedFeatures returns the support for advanced features
GetAdvancedFeatures() AdvancedFeatureInfo
// ConcatenateObjects is for multiple-part backup, it concatenates multiple objects into one object
ConcatenateObjects(ctx context.Context, objectIDs []ID) (ID, error)
// Time returns the local time of the backup repository. It may be different from the time of the caller
Time() time.Time

View File

@ -238,7 +238,21 @@ func (sr *shimRepository) Flush(ctx context.Context) error {
}
func (sr *shimRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) {
return object.ID{}, errors.New("ConcatenateObjects is not supported")
if len(objectIDs) == 0 {
return object.EmptyID, errors.New("object list is empty")
}
ids := []udmrepo.ID{}
for _, id := range objectIDs {
ids = append(ids, udmrepo.ID(id.String()))
}
id, err := sr.udmRepo.ConcatenateObjects(ctx, ids)
if err != nil {
return object.EmptyID, err
}
return object.ParseID(string(id))
}
func (sr *shimRepository) OnSuccessfulFlush(callback repo.RepositoryWriterCallback) {

View File

@ -65,10 +65,6 @@ func TestShimRepo(t *testing.T) {
backupRepo.On("Flush", mock.Anything).Return(nil)
NewShimRepo(backupRepo).Flush(ctx)
var objID object.ID
backupRepo.On("ConcatenateObjects", mock.Anything, mock.Anything).Return(objID)
NewShimRepo(backupRepo).ConcatenateObjects(ctx, []object.ID{})
backupRepo.On("NewObjectWriter", mock.Anything, mock.Anything).Return(nil)
NewShimRepo(backupRepo).NewObjectWriter(ctx, object.WriterOptions{})
}
@ -290,3 +286,64 @@ func TestReplaceManifests(t *testing.T) {
})
}
}
func TestConcatenateObjects(t *testing.T) {
tests := []struct {
name string
backupRepo *mocks.BackupRepo
objectIDs []object.ID
expectedError string
}{
{
name: "empty object list",
expectedError: "object list is empty",
},
{
name: "concatenate error",
backupRepo: func() *mocks.BackupRepo {
backupRepo := &mocks.BackupRepo{}
backupRepo.On("ConcatenateObjects", mock.Anything, mock.Anything).Return(udmrepo.ID(""), errors.New("fake-concatenate-error"))
return backupRepo
}(),
objectIDs: []object.ID{
{},
},
expectedError: "fake-concatenate-error",
},
{
name: "parse error",
backupRepo: func() *mocks.BackupRepo {
backupRepo := &mocks.BackupRepo{}
backupRepo.On("ConcatenateObjects", mock.Anything, mock.Anything).Return(udmrepo.ID("fake-id"), nil)
return backupRepo
}(),
objectIDs: []object.ID{
{},
},
expectedError: "malformed content ID: \"fake-id\": invalid content prefix",
},
{
name: "success",
backupRepo: func() *mocks.BackupRepo {
backupRepo := &mocks.BackupRepo{}
backupRepo.On("ConcatenateObjects", mock.Anything, mock.Anything).Return(udmrepo.ID("I123456"), nil)
return backupRepo
}(),
objectIDs: []object.ID{
{},
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
_, err := NewShimRepo(tc.backupRepo).ConcatenateObjects(ctx, tc.objectIDs)
if tc.expectedError != "" {
assert.EqualError(t, err, tc.expectedError)
} else {
assert.NoError(t, err)
}
})
}
}

View File

@ -54,6 +54,8 @@ var listSnapshotsFunc = snapshot.ListSnapshots
var filesystemEntryFunc = snapshotfs.FilesystemEntryFromIDWithPath
var restoreEntryFunc = restore.Entry
const UploaderConfigMultipartKey = "uploader-multipart"
// SnapshotUploader which mainly used for UT test that could overwrite Upload interface
type SnapshotUploader interface {
Upload(
@ -120,6 +122,10 @@ func setupPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceInfo snap
}
}
if _, ok := uploaderCfg[UploaderConfigMultipartKey]; ok {
curPolicy.UploadPolicy.ParallelUploadAboveSize = newOptionalInt64(2 << 30)
}
err := setPolicyFunc(ctx, rep, sourceInfo, curPolicy)
if err != nil {
return nil, errors.Wrap(err, "error to set policy")

View File

@ -159,6 +159,14 @@ func (kp *kopiaProvider) RunBackup(
realSource = fmt.Sprintf("%s/%s/%s", kp.requestorType, uploader.KopiaType, realSource)
}
if kp.bkRepo.GetAdvancedFeatures().MultiPartBackup {
if uploaderCfg == nil {
uploaderCfg = make(map[string]string)
}
uploaderCfg[kopia.UploaderConfigMultipartKey] = "true"
}
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, uploaderCfg, tags, log)
if err != nil {
if kpUploader.IsCanceled() {

View File

@ -62,8 +62,12 @@ type FakeRestoreProgressUpdater struct {
func (f *FakeRestoreProgressUpdater) UpdateProgress(p *uploader.Progress) {}
func TestRunBackup(t *testing.T) {
mockBRepo := udmrepomocks.NewBackupRepo(t)
mockBRepo.On("GetAdvancedFeatures").Return(udmrepo.AdvancedFeatureInfo{})
var kp kopiaProvider
kp.log = logrus.New()
kp.bkRepo = mockBRepo
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(util.VeleroScheme).Build()}
testCases := []struct {