diff --git a/pkg/uploader/kopia/shim.go b/pkg/uploader/kopia/shim.go index 23fb6c4a3..94856ce97 100644 --- a/pkg/uploader/kopia/shim.go +++ b/pkg/uploader/kopia/shim.go @@ -210,7 +210,25 @@ func (sr *shimRepository) DeleteManifest(ctx context.Context, id manifest.ID) er } func (sr *shimRepository) ReplaceManifests(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) { - return manifest.ID(""), errors.New("ReplaceManifests is not supported") + const minReplaceManifestTimeDelta = 100 * time.Millisecond + + md, err := sr.FindManifests(ctx, labels) + if err != nil { + return "", errors.Wrap(err, "unable to load manifests") + } + + for _, em := range md { + age := sr.Time().Sub(em.ModTime) + if age < minReplaceManifestTimeDelta { + time.Sleep(minReplaceManifestTimeDelta) + } + + if err := sr.DeleteManifest(ctx, em.ID); err != nil { + return "", errors.Wrapf(err, "unable to delete previous manifest %v", em.ID) + } + } + + return sr.PutManifest(ctx, labels, payload) } // Flush all the unifited repository data diff --git a/pkg/uploader/kopia/shim_test.go b/pkg/uploader/kopia/shim_test.go index 50c99b9e1..a888d63ec 100644 --- a/pkg/uploader/kopia/shim_test.go +++ b/pkg/uploader/kopia/shim_test.go @@ -49,7 +49,6 @@ func TestShimRepo(t *testing.T) { shim.PrefetchObjects(ctx, []object.ID{}, "hint") shim.UpdateDescription("desc") shim.NewWriter(ctx, repo.WriteSessionOptions{}) - shim.ReplaceManifests(ctx, map[string]string{}, nil) shim.OnSuccessfulFlush(func(ctx context.Context, w repo.RepositoryWriter) error { return nil }) backupRepo.On("Close", mock.Anything).Return(nil) @@ -202,3 +201,92 @@ func TestShimObjWriter(t *testing.T) { objWriter.On("Close").Return(nil) writer.Close() } + +func TestReplaceManifests(t *testing.T) { + meta1 := udmrepo.ManifestEntryMetadata{ + ID: "mani-1", + } + + meta2 := udmrepo.ManifestEntryMetadata{ + ID: "mani-2", + } + + tests := []struct { + name string + backupRepo *mocks.BackupRepo + isGetManifestError bool + expectedError string + expectedID manifest.ID + }{ + { + name: "Failed to find manifest", + isGetManifestError: true, + backupRepo: func() *mocks.BackupRepo { + backupRepo := &mocks.BackupRepo{} + backupRepo.On("FindManifests", mock.Anything, mock.Anything).Return([]*udmrepo.ManifestEntryMetadata{}, + errors.New("fake-find-error")) + return backupRepo + }(), + expectedError: "unable to load manifests: failed to get manifests with labels map[]: fake-find-error", + }, + { + name: "Failed to delete manifest", + isGetManifestError: true, + backupRepo: func() *mocks.BackupRepo { + backupRepo := &mocks.BackupRepo{} + backupRepo.On("FindManifests", mock.Anything, mock.Anything).Return([]*udmrepo.ManifestEntryMetadata{ + &meta1, + &meta2, + }, nil) + backupRepo.On("Time").Return(time.Now()) + backupRepo.On("DeleteManifest", mock.Anything, mock.Anything).Return(errors.New("fake-delete-error")) + return backupRepo + }(), + expectedError: "unable to delete previous manifest mani-1: fake-delete-error", + }, + { + name: "Failed to put manifest", + backupRepo: func() *mocks.BackupRepo { + backupRepo := &mocks.BackupRepo{} + backupRepo.On("FindManifests", mock.Anything, mock.Anything).Return([]*udmrepo.ManifestEntryMetadata{ + &meta1, + &meta2, + }, nil) + backupRepo.On("Time").Return(time.Now()) + backupRepo.On("DeleteManifest", mock.Anything, mock.Anything).Return(nil) + backupRepo.On("PutManifest", mock.Anything, mock.Anything).Return(udmrepo.ID(""), errors.New("fake-put-error")) + return backupRepo + }(), + expectedError: "fake-put-error", + }, + { + name: "Success", + backupRepo: func() *mocks.BackupRepo { + backupRepo := &mocks.BackupRepo{} + backupRepo.On("FindManifests", mock.Anything, mock.Anything).Return([]*udmrepo.ManifestEntryMetadata{ + &meta1, + &meta2, + }, nil) + backupRepo.On("Time").Return(time.Now()) + backupRepo.On("DeleteManifest", mock.Anything, mock.Anything).Return(nil) + backupRepo.On("PutManifest", mock.Anything, mock.Anything).Return(udmrepo.ID("fake-id"), nil) + return backupRepo + }(), + expectedID: manifest.ID("fake-id"), + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + id, err := NewShimRepo(tc.backupRepo).ReplaceManifests(ctx, map[string]string{}, nil) + + if tc.expectedError != "" { + assert.EqualError(t, err, tc.expectedError) + } else { + assert.NoError(t, err) + } + + assert.Equal(t, tc.expectedID, id) + }) + } +} diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index e5f56e42e..acb4c00e8 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -44,7 +44,8 @@ import ( ) // All function mainly used to make testing more convenient -var applyRetentionPolicyFunc = policy.ApplyRetentionPolicy +var treeForSourceFunc = policy.TreeForSource +var setPolicyFunc = policy.SetPolicy var saveSnapshotFunc = snapshot.SaveSnapshot var loadSnapshotFunc = snapshot.LoadSnapshot var listSnapshotsFunc = snapshot.ListSnapshots @@ -72,18 +73,46 @@ func newOptionalBool(b bool) *policy.OptionalBool { return &ob } -// setupDefaultPolicy set default policy for kopia -func setupDefaultPolicy() *policy.Tree { - defaultPolicy := *policy.DefaultPolicy +func getDefaultPolicy() *policy.Policy { + return &policy.Policy{ + RetentionPolicy: policy.RetentionPolicy{ + KeepLatest: newOptionalInt(math.MaxInt32), + KeepAnnual: newOptionalInt(math.MaxInt32), + KeepDaily: newOptionalInt(math.MaxInt32), + KeepHourly: newOptionalInt(math.MaxInt32), + KeepMonthly: newOptionalInt(math.MaxInt32), + KeepWeekly: newOptionalInt(math.MaxInt32), + }, + CompressionPolicy: policy.CompressionPolicy{ + CompressorName: "none", + }, + UploadPolicy: policy.UploadPolicy{ + MaxParallelFileReads: newOptionalInt(runtime.NumCPU()), + ParallelUploadAboveSize: nil, + }, + SchedulingPolicy: policy.SchedulingPolicy{ + Manual: true, + }, + ErrorHandlingPolicy: policy.ErrorHandlingPolicy{ + IgnoreUnknownTypes: newOptionalBool(true), + }, + } +} - defaultPolicy.RetentionPolicy.KeepLatest = newOptionalInt(math.MaxInt32) - defaultPolicy.CompressionPolicy.CompressorName = "none" - defaultPolicy.UploadPolicy.MaxParallelFileReads = newOptionalInt(runtime.NumCPU()) - defaultPolicy.UploadPolicy.ParallelUploadAboveSize = nil - defaultPolicy.SchedulingPolicy.Manual = true - defaultPolicy.ErrorHandlingPolicy.IgnoreUnknownTypes = newOptionalBool(true) +func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceInfo snapshot.SourceInfo) (*policy.Tree, error) { + // some internal operations from Kopia code retrieves policies from repo directly, so we need to persist the policy to repo + err := setPolicyFunc(ctx, rep, sourceInfo, getDefaultPolicy()) + if err != nil { + return nil, errors.Wrap(err, "error to set policy") + } - return policy.BuildTree(nil, &defaultPolicy) + // retrieve policy from repo + policyTree, err := treeForSourceFunc(ctx, rep, sourceInfo) + if err != nil { + return nil, errors.Wrap(err, "error to retrieve policy") + } + + return policyTree, nil } // Backup backup specific sourcePath and update progress @@ -117,7 +146,7 @@ func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.Re Host: udmrepo.GetRepoDomain(), Path: filepath.Clean(realSource), } - if sourceInfo.Path == "" { + if realSource == "" { sourceInfo.Path = dir } @@ -213,7 +242,10 @@ func SnapshotSource( log.Infof("Using parent snapshot %s, start time %v, end time %v, description %s", previous[i].ID, previous[i].StartTime.ToTime(), previous[i].EndTime.ToTime(), previous[i].Description) } - policyTree := setupDefaultPolicy() + policyTree, err := setupDefaultPolicy(ctx, rep, sourceInfo) + if err != nil { + return "", 0, errors.Wrapf(err, "unable to set policy for si %v", sourceInfo) + } manifest, err := u.Upload(ctx, rootDir, policyTree, sourceInfo, previous...) if err != nil { @@ -227,10 +259,7 @@ func SnapshotSource( if _, err = saveSnapshotFunc(ctx, rep, manifest); err != nil { return "", 0, errors.Wrapf(err, "Failed to save kopia manifest %v", manifest.ID) } - _, err = applyRetentionPolicyFunc(ctx, rep, sourceInfo, true) - if err != nil { - return "", 0, errors.Wrapf(err, "Failed to apply kopia retention policy for si %v", sourceInfo) - } + if err = rep.Flush(ctx); err != nil { return "", 0, errors.Wrapf(err, "Failed to flush kopia repository") } diff --git a/pkg/uploader/kopia/snapshot_test.go b/pkg/uploader/kopia/snapshot_test.go index 2093b55d5..232ea92c4 100644 --- a/pkg/uploader/kopia/snapshot_test.go +++ b/pkg/uploader/kopia/snapshot_test.go @@ -26,6 +26,7 @@ import ( "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/snapshot" + "github.com/kopia/kopia/snapshot/policy" "github.com/kopia/kopia/snapshot/restore" "github.com/kopia/kopia/snapshot/snapshotfs" "github.com/pkg/errors" @@ -58,7 +59,8 @@ func injectSnapshotFuncs() *snapshotMockes { repoWriterMock: &repomocks.RepositoryWriter{}, } - applyRetentionPolicyFunc = s.policyMock.ApplyRetentionPolicy + setPolicyFunc = s.policyMock.SetPolicy + treeForSourceFunc = s.policyMock.TreeForSource loadSnapshotFunc = s.snapshotMock.LoadSnapshot saveSnapshotFunc = s.snapshotMock.SaveSnapshot return s @@ -135,13 +137,13 @@ func TestSnapshotSource(t *testing.T) { notError: false, }, { - name: "failed to apply policy", + name: "failed to set policy", args: []mockArgs{ {methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}}, {methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}}, {methodName: "TreeForSource", returns: []interface{}{nil, nil}}, - {methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, errors.New("failed to save snapshot")}}, - {methodName: "SetPolicy", returns: []interface{}{nil}}, + {methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}}, + {methodName: "SetPolicy", returns: []interface{}{errors.New("failed to set policy")}}, {methodName: "Upload", returns: []interface{}{manifest, nil}}, {methodName: "Flush", returns: []interface{}{nil}}, }, @@ -232,7 +234,7 @@ func TestReportSnapshotStatus(t *testing.T) { }, } - result, size, err := reportSnapshotStatus(manifest, setupDefaultPolicy()) + result, size, err := reportSnapshotStatus(manifest, policy.BuildTree(nil, getDefaultPolicy())) switch { case tc.shouldError && err == nil: