parent
8d3a67544d
commit
d13a23364f
|
@ -210,7 +210,25 @@ func (sr *shimRepository) DeleteManifest(ctx context.Context, id manifest.ID) er
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sr *shimRepository) ReplaceManifests(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) {
|
func (sr *shimRepository) ReplaceManifests(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) {
|
||||||
return manifest.ID(""), errors.New("ReplaceManifests is not supported")
|
const minReplaceManifestTimeDelta = 100 * time.Millisecond
|
||||||
|
|
||||||
|
md, err := sr.FindManifests(ctx, labels)
|
||||||
|
if err != nil {
|
||||||
|
return "", errors.Wrap(err, "unable to load manifests")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, em := range md {
|
||||||
|
age := sr.Time().Sub(em.ModTime)
|
||||||
|
if age < minReplaceManifestTimeDelta {
|
||||||
|
time.Sleep(minReplaceManifestTimeDelta)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := sr.DeleteManifest(ctx, em.ID); err != nil {
|
||||||
|
return "", errors.Wrapf(err, "unable to delete previous manifest %v", em.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return sr.PutManifest(ctx, labels, payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush all the unifited repository data
|
// Flush all the unifited repository data
|
||||||
|
|
|
@ -49,7 +49,6 @@ func TestShimRepo(t *testing.T) {
|
||||||
shim.PrefetchObjects(ctx, []object.ID{}, "hint")
|
shim.PrefetchObjects(ctx, []object.ID{}, "hint")
|
||||||
shim.UpdateDescription("desc")
|
shim.UpdateDescription("desc")
|
||||||
shim.NewWriter(ctx, repo.WriteSessionOptions{})
|
shim.NewWriter(ctx, repo.WriteSessionOptions{})
|
||||||
shim.ReplaceManifests(ctx, map[string]string{}, nil)
|
|
||||||
shim.OnSuccessfulFlush(func(ctx context.Context, w repo.RepositoryWriter) error { return nil })
|
shim.OnSuccessfulFlush(func(ctx context.Context, w repo.RepositoryWriter) error { return nil })
|
||||||
|
|
||||||
backupRepo.On("Close", mock.Anything).Return(nil)
|
backupRepo.On("Close", mock.Anything).Return(nil)
|
||||||
|
@ -202,3 +201,92 @@ func TestShimObjWriter(t *testing.T) {
|
||||||
objWriter.On("Close").Return(nil)
|
objWriter.On("Close").Return(nil)
|
||||||
writer.Close()
|
writer.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReplaceManifests(t *testing.T) {
|
||||||
|
meta1 := udmrepo.ManifestEntryMetadata{
|
||||||
|
ID: "mani-1",
|
||||||
|
}
|
||||||
|
|
||||||
|
meta2 := udmrepo.ManifestEntryMetadata{
|
||||||
|
ID: "mani-2",
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
backupRepo *mocks.BackupRepo
|
||||||
|
isGetManifestError bool
|
||||||
|
expectedError string
|
||||||
|
expectedID manifest.ID
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Failed to find manifest",
|
||||||
|
isGetManifestError: true,
|
||||||
|
backupRepo: func() *mocks.BackupRepo {
|
||||||
|
backupRepo := &mocks.BackupRepo{}
|
||||||
|
backupRepo.On("FindManifests", mock.Anything, mock.Anything).Return([]*udmrepo.ManifestEntryMetadata{},
|
||||||
|
errors.New("fake-find-error"))
|
||||||
|
return backupRepo
|
||||||
|
}(),
|
||||||
|
expectedError: "unable to load manifests: failed to get manifests with labels map[]: fake-find-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Failed to delete manifest",
|
||||||
|
isGetManifestError: true,
|
||||||
|
backupRepo: func() *mocks.BackupRepo {
|
||||||
|
backupRepo := &mocks.BackupRepo{}
|
||||||
|
backupRepo.On("FindManifests", mock.Anything, mock.Anything).Return([]*udmrepo.ManifestEntryMetadata{
|
||||||
|
&meta1,
|
||||||
|
&meta2,
|
||||||
|
}, nil)
|
||||||
|
backupRepo.On("Time").Return(time.Now())
|
||||||
|
backupRepo.On("DeleteManifest", mock.Anything, mock.Anything).Return(errors.New("fake-delete-error"))
|
||||||
|
return backupRepo
|
||||||
|
}(),
|
||||||
|
expectedError: "unable to delete previous manifest mani-1: fake-delete-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Failed to put manifest",
|
||||||
|
backupRepo: func() *mocks.BackupRepo {
|
||||||
|
backupRepo := &mocks.BackupRepo{}
|
||||||
|
backupRepo.On("FindManifests", mock.Anything, mock.Anything).Return([]*udmrepo.ManifestEntryMetadata{
|
||||||
|
&meta1,
|
||||||
|
&meta2,
|
||||||
|
}, nil)
|
||||||
|
backupRepo.On("Time").Return(time.Now())
|
||||||
|
backupRepo.On("DeleteManifest", mock.Anything, mock.Anything).Return(nil)
|
||||||
|
backupRepo.On("PutManifest", mock.Anything, mock.Anything).Return(udmrepo.ID(""), errors.New("fake-put-error"))
|
||||||
|
return backupRepo
|
||||||
|
}(),
|
||||||
|
expectedError: "fake-put-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Success",
|
||||||
|
backupRepo: func() *mocks.BackupRepo {
|
||||||
|
backupRepo := &mocks.BackupRepo{}
|
||||||
|
backupRepo.On("FindManifests", mock.Anything, mock.Anything).Return([]*udmrepo.ManifestEntryMetadata{
|
||||||
|
&meta1,
|
||||||
|
&meta2,
|
||||||
|
}, nil)
|
||||||
|
backupRepo.On("Time").Return(time.Now())
|
||||||
|
backupRepo.On("DeleteManifest", mock.Anything, mock.Anything).Return(nil)
|
||||||
|
backupRepo.On("PutManifest", mock.Anything, mock.Anything).Return(udmrepo.ID("fake-id"), nil)
|
||||||
|
return backupRepo
|
||||||
|
}(),
|
||||||
|
expectedID: manifest.ID("fake-id"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
id, err := NewShimRepo(tc.backupRepo).ReplaceManifests(ctx, map[string]string{}, nil)
|
||||||
|
|
||||||
|
if tc.expectedError != "" {
|
||||||
|
assert.EqualError(t, err, tc.expectedError)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, tc.expectedID, id)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -44,7 +44,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// All function mainly used to make testing more convenient
|
// All function mainly used to make testing more convenient
|
||||||
var applyRetentionPolicyFunc = policy.ApplyRetentionPolicy
|
var treeForSourceFunc = policy.TreeForSource
|
||||||
|
var setPolicyFunc = policy.SetPolicy
|
||||||
var saveSnapshotFunc = snapshot.SaveSnapshot
|
var saveSnapshotFunc = snapshot.SaveSnapshot
|
||||||
var loadSnapshotFunc = snapshot.LoadSnapshot
|
var loadSnapshotFunc = snapshot.LoadSnapshot
|
||||||
var listSnapshotsFunc = snapshot.ListSnapshots
|
var listSnapshotsFunc = snapshot.ListSnapshots
|
||||||
|
@ -72,18 +73,46 @@ func newOptionalBool(b bool) *policy.OptionalBool {
|
||||||
return &ob
|
return &ob
|
||||||
}
|
}
|
||||||
|
|
||||||
// setupDefaultPolicy set default policy for kopia
|
func getDefaultPolicy() *policy.Policy {
|
||||||
func setupDefaultPolicy() *policy.Tree {
|
return &policy.Policy{
|
||||||
defaultPolicy := *policy.DefaultPolicy
|
RetentionPolicy: policy.RetentionPolicy{
|
||||||
|
KeepLatest: newOptionalInt(math.MaxInt32),
|
||||||
|
KeepAnnual: newOptionalInt(math.MaxInt32),
|
||||||
|
KeepDaily: newOptionalInt(math.MaxInt32),
|
||||||
|
KeepHourly: newOptionalInt(math.MaxInt32),
|
||||||
|
KeepMonthly: newOptionalInt(math.MaxInt32),
|
||||||
|
KeepWeekly: newOptionalInt(math.MaxInt32),
|
||||||
|
},
|
||||||
|
CompressionPolicy: policy.CompressionPolicy{
|
||||||
|
CompressorName: "none",
|
||||||
|
},
|
||||||
|
UploadPolicy: policy.UploadPolicy{
|
||||||
|
MaxParallelFileReads: newOptionalInt(runtime.NumCPU()),
|
||||||
|
ParallelUploadAboveSize: nil,
|
||||||
|
},
|
||||||
|
SchedulingPolicy: policy.SchedulingPolicy{
|
||||||
|
Manual: true,
|
||||||
|
},
|
||||||
|
ErrorHandlingPolicy: policy.ErrorHandlingPolicy{
|
||||||
|
IgnoreUnknownTypes: newOptionalBool(true),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
defaultPolicy.RetentionPolicy.KeepLatest = newOptionalInt(math.MaxInt32)
|
func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceInfo snapshot.SourceInfo) (*policy.Tree, error) {
|
||||||
defaultPolicy.CompressionPolicy.CompressorName = "none"
|
// some internal operations from Kopia code retrieves policies from repo directly, so we need to persist the policy to repo
|
||||||
defaultPolicy.UploadPolicy.MaxParallelFileReads = newOptionalInt(runtime.NumCPU())
|
err := setPolicyFunc(ctx, rep, sourceInfo, getDefaultPolicy())
|
||||||
defaultPolicy.UploadPolicy.ParallelUploadAboveSize = nil
|
if err != nil {
|
||||||
defaultPolicy.SchedulingPolicy.Manual = true
|
return nil, errors.Wrap(err, "error to set policy")
|
||||||
defaultPolicy.ErrorHandlingPolicy.IgnoreUnknownTypes = newOptionalBool(true)
|
}
|
||||||
|
|
||||||
return policy.BuildTree(nil, &defaultPolicy)
|
// retrieve policy from repo
|
||||||
|
policyTree, err := treeForSourceFunc(ctx, rep, sourceInfo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "error to retrieve policy")
|
||||||
|
}
|
||||||
|
|
||||||
|
return policyTree, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Backup backup specific sourcePath and update progress
|
// Backup backup specific sourcePath and update progress
|
||||||
|
@ -117,7 +146,7 @@ func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.Re
|
||||||
Host: udmrepo.GetRepoDomain(),
|
Host: udmrepo.GetRepoDomain(),
|
||||||
Path: filepath.Clean(realSource),
|
Path: filepath.Clean(realSource),
|
||||||
}
|
}
|
||||||
if sourceInfo.Path == "" {
|
if realSource == "" {
|
||||||
sourceInfo.Path = dir
|
sourceInfo.Path = dir
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,7 +242,10 @@ func SnapshotSource(
|
||||||
log.Infof("Using parent snapshot %s, start time %v, end time %v, description %s", previous[i].ID, previous[i].StartTime.ToTime(), previous[i].EndTime.ToTime(), previous[i].Description)
|
log.Infof("Using parent snapshot %s, start time %v, end time %v, description %s", previous[i].ID, previous[i].StartTime.ToTime(), previous[i].EndTime.ToTime(), previous[i].Description)
|
||||||
}
|
}
|
||||||
|
|
||||||
policyTree := setupDefaultPolicy()
|
policyTree, err := setupDefaultPolicy(ctx, rep, sourceInfo)
|
||||||
|
if err != nil {
|
||||||
|
return "", 0, errors.Wrapf(err, "unable to set policy for si %v", sourceInfo)
|
||||||
|
}
|
||||||
|
|
||||||
manifest, err := u.Upload(ctx, rootDir, policyTree, sourceInfo, previous...)
|
manifest, err := u.Upload(ctx, rootDir, policyTree, sourceInfo, previous...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -227,10 +259,7 @@ func SnapshotSource(
|
||||||
if _, err = saveSnapshotFunc(ctx, rep, manifest); err != nil {
|
if _, err = saveSnapshotFunc(ctx, rep, manifest); err != nil {
|
||||||
return "", 0, errors.Wrapf(err, "Failed to save kopia manifest %v", manifest.ID)
|
return "", 0, errors.Wrapf(err, "Failed to save kopia manifest %v", manifest.ID)
|
||||||
}
|
}
|
||||||
_, err = applyRetentionPolicyFunc(ctx, rep, sourceInfo, true)
|
|
||||||
if err != nil {
|
|
||||||
return "", 0, errors.Wrapf(err, "Failed to apply kopia retention policy for si %v", sourceInfo)
|
|
||||||
}
|
|
||||||
if err = rep.Flush(ctx); err != nil {
|
if err = rep.Flush(ctx); err != nil {
|
||||||
return "", 0, errors.Wrapf(err, "Failed to flush kopia repository")
|
return "", 0, errors.Wrapf(err, "Failed to flush kopia repository")
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"github.com/kopia/kopia/repo"
|
"github.com/kopia/kopia/repo"
|
||||||
"github.com/kopia/kopia/repo/manifest"
|
"github.com/kopia/kopia/repo/manifest"
|
||||||
"github.com/kopia/kopia/snapshot"
|
"github.com/kopia/kopia/snapshot"
|
||||||
|
"github.com/kopia/kopia/snapshot/policy"
|
||||||
"github.com/kopia/kopia/snapshot/restore"
|
"github.com/kopia/kopia/snapshot/restore"
|
||||||
"github.com/kopia/kopia/snapshot/snapshotfs"
|
"github.com/kopia/kopia/snapshot/snapshotfs"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -58,7 +59,8 @@ func injectSnapshotFuncs() *snapshotMockes {
|
||||||
repoWriterMock: &repomocks.RepositoryWriter{},
|
repoWriterMock: &repomocks.RepositoryWriter{},
|
||||||
}
|
}
|
||||||
|
|
||||||
applyRetentionPolicyFunc = s.policyMock.ApplyRetentionPolicy
|
setPolicyFunc = s.policyMock.SetPolicy
|
||||||
|
treeForSourceFunc = s.policyMock.TreeForSource
|
||||||
loadSnapshotFunc = s.snapshotMock.LoadSnapshot
|
loadSnapshotFunc = s.snapshotMock.LoadSnapshot
|
||||||
saveSnapshotFunc = s.snapshotMock.SaveSnapshot
|
saveSnapshotFunc = s.snapshotMock.SaveSnapshot
|
||||||
return s
|
return s
|
||||||
|
@ -135,13 +137,13 @@ func TestSnapshotSource(t *testing.T) {
|
||||||
notError: false,
|
notError: false,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "failed to apply policy",
|
name: "failed to set policy",
|
||||||
args: []mockArgs{
|
args: []mockArgs{
|
||||||
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
|
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
|
||||||
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}},
|
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}},
|
||||||
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
|
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
|
||||||
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, errors.New("failed to save snapshot")}},
|
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
|
||||||
{methodName: "SetPolicy", returns: []interface{}{nil}},
|
{methodName: "SetPolicy", returns: []interface{}{errors.New("failed to set policy")}},
|
||||||
{methodName: "Upload", returns: []interface{}{manifest, nil}},
|
{methodName: "Upload", returns: []interface{}{manifest, nil}},
|
||||||
{methodName: "Flush", returns: []interface{}{nil}},
|
{methodName: "Flush", returns: []interface{}{nil}},
|
||||||
},
|
},
|
||||||
|
@ -232,7 +234,7 @@ func TestReportSnapshotStatus(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
result, size, err := reportSnapshotStatus(manifest, setupDefaultPolicy())
|
result, size, err := reportSnapshotStatus(manifest, policy.BuildTree(nil, getDefaultPolicy()))
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case tc.shouldError && err == nil:
|
case tc.shouldError && err == nil:
|
||||||
|
|
Loading…
Reference in New Issue