Make Kopia file parallelism configurable

Signed-off-by: Ming <mqiu@vmware.com>
pull/7000/head
Ming 2023-10-23 02:08:16 +00:00
parent 19f38f9623
commit 481cb60493
39 changed files with 241 additions and 61 deletions

View File

@ -0,0 +1 @@
Make Kopia file parallelism configurable

View File

@ -477,6 +477,15 @@ spec:
description: TTL is a time.Duration-parseable string describing how
long the Backup should be retained for.
type: string
uploaderConfig:
description: UploaderConfig specifies the configuration for the uploader.
nullable: true
properties:
parallelFilesUpload:
description: ParallelFilesUpload is the number of files parallel
uploads to perform when using the uploader.
type: integer
type: object
volumeSnapshotLocations:
description: VolumeSnapshotLocations is a list containing names of
VolumeSnapshotLocations associated with this backup.

View File

@ -121,6 +121,14 @@ spec:
description: Tags are a map of key-value pairs that should be applied
to the volume backup as tags.
type: object
uploaderConfig:
description: UploaderConfig specifies the configuration for the uploader.
properties:
parallelFilesUpload:
description: ParallelFilesUpload is the number of files parallel
uploads to perform when using the uploader.
type: integer
type: object
uploaderType:
description: UploaderType is the type of the uploader to handle the
data transfer.

View File

@ -186,6 +186,12 @@ spec:
- Continue
- Fail
type: string
waitForReady:
description: WaitForReady ensures command will
be launched when container is Ready instead
of Running.
nullable: true
type: boolean
waitTimeout:
description: WaitTimeout defines the maximum amount
of time Velero should wait for the container

View File

@ -514,6 +514,16 @@ spec:
description: TTL is a time.Duration-parseable string describing
how long the Backup should be retained for.
type: string
uploaderConfig:
description: UploaderConfig specifies the configuration for the
uploader.
nullable: true
properties:
parallelFilesUpload:
description: ParallelFilesUpload is the number of files parallel
uploads to perform when using the uploader.
type: integer
type: object
volumeSnapshotLocations:
description: VolumeSnapshotLocations is a list containing names
of VolumeSnapshotLocations associated with this backup.

File diff suppressed because one or more lines are too long

View File

@ -123,6 +123,14 @@ spec:
description: SourcePVC is the name of the PVC which the snapshot is
taken for.
type: string
uploaderConfig:
description: UploaderConfig specifies the configuration for the uploader.
properties:
parallelFilesUpload:
description: ParallelFilesUpload is the number of files parallel
uploads to perform when using the uploader.
type: integer
type: object
required:
- backupStorageLocation
- operationTimeout

File diff suppressed because one or more lines are too long

View File

@ -689,7 +689,8 @@ type Provider interface {
tags map[string]string,
forceFull bool,
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
volMode uploader.PersistentVolumeMode,
uploaderCfg shared.UploaderConfig,
updater uploader.ProgressUpdater) (string, bool, error)
RunRestore(

View File

@ -0,0 +1,23 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shared
// UploaderConfig defines the configuration for the uploader.
type UploaderConfig struct {
// ParallelFilesUpload is the number of files parallel uploads to perform when using the uploader.
ParallelFilesUpload int `json:"parallelFilesUpload,omitempty"`
}

View File

@ -19,6 +19,8 @@ package v1
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
)
type Metadata struct {
@ -175,6 +177,11 @@ type BackupSpec struct {
// If DataMover is "" or "velero", the built-in data mover will be used.
// +optional
DataMover string `json:"datamover,omitempty"`
// UploaderConfig specifies the configuration for the uploader.
// +optional
// +nullable
UploaderConfig shared.UploaderConfig `json:"uploaderConfig,omitempty"`
}
// BackupHooks contains custom behaviors that should be executed at different phases of the backup.

View File

@ -51,6 +51,9 @@ type PodVolumeBackupSpec struct {
// volume backup as tags.
// +optional
Tags map[string]string `json:"tags,omitempty"`
// UploaderConfig specifies the configuration for the uploader.
UploaderConfig shared.UploaderConfig `json:"uploaderConfig,omitempty"`
}
// PodVolumeBackupPhase represents the lifecycle phase of a PodVolumeBackup.

View File

@ -381,6 +381,7 @@ func (in *BackupSpec) DeepCopyInto(out *BackupSpec) {
*out = new(bool)
**out = **in
}
out.UploaderConfig = in.UploaderConfig
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BackupSpec.
@ -784,6 +785,11 @@ func (in *ExecRestoreHook) DeepCopyInto(out *ExecRestoreHook) {
}
out.ExecTimeout = in.ExecTimeout
out.WaitTimeout = in.WaitTimeout
if in.WaitForReady != nil {
in, out := &in.WaitForReady, &out.WaitForReady
*out = new(bool)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExecRestoreHook.
@ -946,6 +952,7 @@ func (in *PodVolumeBackupSpec) DeepCopyInto(out *PodVolumeBackupSpec) {
(*out)[key] = val
}
}
out.UploaderConfig = in.UploaderConfig
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodVolumeBackupSpec.

View File

@ -60,6 +60,9 @@ type DataUploadSpec struct {
// OperationTimeout specifies the time used to wait internal operations,
// before returning error as timeout.
OperationTimeout metav1.Duration `json:"operationTimeout"`
// UploaderConfig specifies the configuration for the uploader.
UploaderConfig shared.UploaderConfig `json:"uploaderConfig,omitempty"`
}
type SnapshotType string

View File

@ -236,6 +236,7 @@ func (in *DataUploadSpec) DeepCopyInto(out *DataUploadSpec) {
}
}
out.OperationTimeout = in.OperationTimeout
out.UploaderConfig = in.UploaderConfig
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataUploadSpec.

View File

@ -299,3 +299,9 @@ func (b *BackupBuilder) DataMover(name string) *BackupBuilder {
b.object.Spec.DataMover = name
return b
}
// ParallelFilesUpload sets the Backup's uploader parallel uploads
func (b *BackupBuilder) ParallelFilesUpload(parallel int) *BackupBuilder {
b.object.Spec.UploaderConfig.ParallelFilesUpload = parallel
return b
}

View File

@ -106,6 +106,7 @@ type CreateOptions struct {
ItemOperationTimeout time.Duration
ResPoliciesConfigmap string
client kbclient.WithWatch
ParallelFilesUpload int
}
func NewCreateOptions() *CreateOptions {
@ -151,6 +152,7 @@ func (o *CreateOptions) BindFlags(flags *pflag.FlagSet) {
flags.StringVar(&o.ResPoliciesConfigmap, "resource-policies-configmap", "", "Reference to the resource policies configmap that backup using")
flags.StringVar(&o.DataMover, "data-mover", "", "Specify the data mover to be used by the backup. If the parameter is not set or set as 'velero', the built-in data mover will be used")
flags.IntVar(&o.ParallelFilesUpload, "parallel-files-upload", 0, "Number of files uploads simultaneously when running a backup. This is only applicable for the kopia uploader")
}
// BindWait binds the wait flag separately so it is not called by other create
@ -396,6 +398,9 @@ func (o *CreateOptions) BuildBackup(namespace string) (*velerov1api.Backup, erro
if o.ResPoliciesConfigmap != "" {
backupBuilder.ResourcePolicies(o.ResPoliciesConfigmap)
}
if o.ParallelFilesUpload > 0 {
backupBuilder.ParallelFilesUpload(o.ParallelFilesUpload)
}
}
backup := backupBuilder.ObjectMeta(builder.WithLabelsMap(o.Labels.Data())).Result()

View File

@ -185,7 +185,7 @@ func TestCreateCommand(t *testing.T) {
defaultVolumesToFsBackup := "true"
resPoliciesConfigmap := "cm-name-2"
dataMover := "velero"
parallelFilesUpload := 10
flags := new(flag.FlagSet)
o := NewCreateOptions()
o.BindFlags(flags)
@ -213,6 +213,7 @@ func TestCreateCommand(t *testing.T) {
flags.Parse([]string{"--default-volumes-to-fs-backup", defaultVolumesToFsBackup})
flags.Parse([]string{"--resource-policies-configmap", resPoliciesConfigmap})
flags.Parse([]string{"--data-mover", dataMover})
flags.Parse([]string{"--parallel-files-upload", fmt.Sprintf("%d", parallelFilesUpload)})
//flags.Parse([]string{"--wait"})
client := velerotest.NewFakeControllerRuntimeClient(t).(kbclient.WithWatch)
@ -261,6 +262,7 @@ func TestCreateCommand(t *testing.T) {
require.Equal(t, defaultVolumesToFsBackup, o.DefaultVolumesToFsBackup.String())
require.Equal(t, resPoliciesConfigmap, o.ResPoliciesConfigmap)
require.Equal(t, dataMover, o.DataMover)
require.Equal(t, parallelFilesUpload, o.ParallelFilesUpload)
//assert.Equal(t, true, o.Wait)
// verify oldAndNewFilterParametersUsedTogether

View File

@ -88,6 +88,11 @@ func DescribeBackup(
DescribeResourcePolicies(d, backup.Spec.ResourcePolicy)
}
if backup.Spec.UploaderConfig.ParallelFilesUpload > 0 {
d.Println()
DescribeUploaderConfig(d, backup.Spec)
}
status := backup.Status
if len(status.ValidationErrors) > 0 {
d.Println()
@ -123,13 +128,19 @@ func DescribeBackup(
})
}
// DescribeResourcePolicies describes resource policiesin human-readable format
// DescribeResourcePolicies describes resource policies in human-readable format
func DescribeResourcePolicies(d *Describer, resPolicies *v1.TypedLocalObjectReference) {
d.Printf("Resource policies:\n")
d.Printf("\tType:\t%s\n", resPolicies.Kind)
d.Printf("\tName:\t%s\n", resPolicies.Name)
}
// DescribeUploaderConfig describes uploader config in human-readable format
func DescribeUploaderConfig(d *Describer, spec velerov1api.BackupSpec) {
d.Printf("Uploader config:\n")
d.Printf("\tParallel files upload:\t%d\n", spec.UploaderConfig.ParallelFilesUpload)
}
// DescribeBackupSpec describes a backup spec in human-readable format.
func DescribeBackupSpec(d *Describer, spec velerov1api.BackupSpec) {
// TODO make a helper for this and use it in all the describers.

View File

@ -20,6 +20,22 @@ import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
)
func TestDescribeUploaderConfig(t *testing.T) {
input := builder.ForBackup("test-ns", "test-backup-1").ParallelFilesUpload(10).Result().Spec
d := &Describer{
Prefix: "",
out: &tabwriter.Writer{},
buf: &bytes.Buffer{},
}
d.out.Init(d.buf, 0, 8, 2, ' ', 0)
DescribeUploaderConfig(d, input)
d.out.Flush()
expect := `Uploader config:
Parallel files upload: 10
`
assert.Equal(t, expect, d.buf.String())
}
func TestDescribeResourcePolicies(t *testing.T) {
input := &v1.TypedLocalObjectReference{
Kind: "configmap",

View File

@ -59,7 +59,7 @@ Last Backup: <never>
input2 := builder.ForSchedule("velero", "schedule-2").
Phase(velerov1api.SchedulePhaseEnabled).
CronSchedule("0 0 * * *").
Template(builder.ForBackup("velero", "backup-1").Result().Spec).
Template(builder.ForBackup("velero", "backup-1").ParallelFilesUpload(10).Result().Spec).
LastBackupTime("2023-06-25 15:04:05").Result()
expect2 := `Name: schedule-2
Namespace: velero
@ -68,6 +68,9 @@ Annotations: <none>
Phase: Enabled
Uploader config:
Parallel files upload: 10
Paused: false
Schedule: 0 0 * * *

View File

@ -48,6 +48,11 @@ func DescribeSchedule(schedule *v1.Schedule) string {
DescribeResourcePolicies(d, schedule.Spec.Template.ResourcePolicy)
}
if schedule.Spec.Template.UploaderConfig.ParallelFilesUpload > 0 {
d.Println()
DescribeUploaderConfig(d, schedule.Spec.Template)
}
status := schedule.Status
if len(status.ValidationErrors) > 0 {
d.Println()

View File

@ -342,7 +342,7 @@ func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, fsBa
tags := map[string]string{
velerov1api.AsyncOperationIDLabel: du.Labels[velerov1api.AsyncOperationIDLabel],
}
if err := fsBackup.StartBackup(path, fmt.Sprintf("%s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC), "", false, tags); err != nil {
if err := fsBackup.StartBackup(path, fmt.Sprintf("%s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC), "", false, tags, du.Spec.UploaderConfig); err != nil {
return r.errorOut(ctx, du, err, "error starting data path backup", log)
}

View File

@ -45,6 +45,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/builder"
@ -281,7 +282,7 @@ func (f *fakeDataUploadFSBR) Init(ctx context.Context, bslName string, sourceNam
return nil
}
func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error {
func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string, uploaderConfigs shared.UploaderConfig) error {
du := f.du
original := f.du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted

View File

@ -178,7 +178,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
}
if err := fsBackup.StartBackup(path, "", parentSnapshotID, false, pvb.Spec.Tags); err != nil {
if err := fsBackup.StartBackup(path, "", parentSnapshotID, false, pvb.Spec.Tags, pvb.Spec.UploaderConfig); err != nil {
return r.errorOut(ctx, &pvb, err, "error starting data path backup", log)
}

View File

@ -37,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
@ -103,7 +104,7 @@ func (b *fakeFSBR) Init(ctx context.Context, bslName string, sourceNamespace str
return nil
}
func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error {
func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string, uploaderConfigs shared.UploaderConfig) error {
pvb := b.pvb
original := b.pvb.DeepCopy()

View File

@ -24,6 +24,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository"
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
@ -129,14 +130,14 @@ func (fs *fileSystemBR) Close(ctx context.Context) {
fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed")
}
func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error {
func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string, uploaderConfigs shared.UploaderConfig) error {
if !fs.initialized {
return errors.New("file system data path is not initialized")
}
go func() {
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull,
parentSnapshot, source.VolMode, fs)
parentSnapshot, source.VolMode, uploaderConfigs, fs)
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)

View File

@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader/provider"
providerMock "github.com/vmware-tanzu/velero/pkg/uploader/provider/mocks"
@ -95,12 +96,12 @@ func TestAsyncBackup(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err)
fs.uploaderProv = mockProvider
fs.initialized = true
fs.callbacks = test.callbacks
err := fs.StartBackup(AccessPoint{ByPath: test.path}, "", "", false, nil)
err := fs.StartBackup(AccessPoint{ByPath: test.path}, "", "", false, nil, shared.UploaderConfig{})
require.Equal(t, nil, err)
<-finish

View File

@ -11,6 +11,8 @@ import (
mock "github.com/stretchr/testify/mock"
repository "github.com/vmware-tanzu/velero/pkg/repository"
shared "github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
)
// AsyncBR is an autogenerated mock type for the AsyncBR type
@ -42,13 +44,13 @@ func (_m *AsyncBR) Init(ctx context.Context, bslName string, sourceNamespace str
return r0
}
// StartBackup provides a mock function with given fields: source, realSource, parentSnapshot, forceFull, tags
func (_m *AsyncBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error {
ret := _m.Called(source, realSource, parentSnapshot, forceFull, tags)
// StartBackup provides a mock function with given fields: source, realSource, parentSnapshot, forceFull, tags, uploaderConfig
func (_m *AsyncBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string, uploaderConfig shared.UploaderConfig) error {
ret := _m.Called(source, realSource, parentSnapshot, forceFull, tags, uploaderConfig)
var r0 error
if rf, ok := ret.Get(0).(func(datapath.AccessPoint, string, string, bool, map[string]string) error); ok {
r0 = rf(source, realSource, parentSnapshot, forceFull, tags)
if rf, ok := ret.Get(0).(func(datapath.AccessPoint, string, string, bool, map[string]string, shared.UploaderConfig) error); ok {
r0 = rf(source, realSource, parentSnapshot, forceFull, tags, uploaderConfig)
} else {
r0 = ret.Error(0)
}

View File

@ -20,6 +20,7 @@ import (
"context"
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
)
@ -62,7 +63,7 @@ type AsyncBR interface {
Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error
// StartBackup starts an asynchronous data path instance for backup
StartBackup(source AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error
StartBackup(source AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string, uploaderConfig shared.UploaderConfig) error
// StartRestore starts an asynchronous data path instance for restore
StartRestore(snapshotID string, target AccessPoint) error

View File

@ -401,6 +401,7 @@ func newPodVolumeBackup(backup *velerov1api.Backup, pod *corev1api.Pod, volume c
BackupStorageLocation: backup.Spec.StorageLocation,
RepoIdentifier: repoIdentifier,
UploaderType: uploaderType,
UploaderConfig: backup.Spec.UploaderConfig,
},
}

View File

@ -28,10 +28,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/vmware-tanzu/velero/pkg/kopia"
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/localfs"
"github.com/kopia/kopia/repo"
@ -41,6 +37,11 @@ import (
"github.com/kopia/kopia/snapshot/restore"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
"github.com/vmware-tanzu/velero/pkg/kopia"
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
"github.com/vmware-tanzu/velero/pkg/uploader"
)
// All function mainly used to make testing more convenient
@ -104,9 +105,14 @@ func getDefaultPolicy() *policy.Policy {
}
}
func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceInfo snapshot.SourceInfo) (*policy.Tree, error) {
func setupPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceInfo snapshot.SourceInfo, uploaderCfg shared.UploaderConfig) (*policy.Tree, error) {
// some internal operations from Kopia code retrieves policies from repo directly, so we need to persist the policy to repo
err := setPolicyFunc(ctx, rep, sourceInfo, getDefaultPolicy())
curPolicy := getDefaultPolicy()
if uploaderCfg.ParallelFilesUpload > 0 {
curPolicy.UploadPolicy.MaxParallelFileReads = newOptionalInt(uploaderCfg.ParallelFilesUpload)
}
err := setPolicyFunc(ctx, rep, sourceInfo, curPolicy)
if err != nil {
return nil, errors.Wrap(err, "error to set policy")
}
@ -127,7 +133,7 @@ func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceIn
// Backup backup specific sourcePath and update progress
func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string,
forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg shared.UploaderConfig, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
if fsUploader == nil {
return nil, false, errors.New("get empty kopia uploader")
}
@ -172,7 +178,7 @@ func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.Re
}
kopiaCtx := kopia.SetupKopiaLog(ctx, log)
snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, log, "Kopia Uploader")
snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, uploaderCfg, log, "Kopia Uploader")
if err != nil {
return nil, false, err
}
@ -223,6 +229,7 @@ func SnapshotSource(
forceFull bool,
parentSnapshot string,
snapshotTags map[string]string,
uploaderCfg shared.UploaderConfig,
log logrus.FieldLogger,
description string,
) (string, int64, error) {
@ -258,7 +265,7 @@ func SnapshotSource(
log.Infof("Using parent snapshot %s, start time %v, end time %v, description %s", previous[i].ID, previous[i].StartTime.ToTime(), previous[i].EndTime.ToTime(), previous[i].Description)
}
policyTree, err := setupDefaultPolicy(ctx, rep, sourceInfo)
policyTree, err := setupPolicy(ctx, rep, sourceInfo, uploaderCfg)
if err != nil {
return "", 0, errors.Wrapf(err, "unable to set policy for si %v", sourceInfo)
}

View File

@ -35,6 +35,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
repomocks "github.com/vmware-tanzu/velero/pkg/repository/mocks"
"github.com/vmware-tanzu/velero/pkg/uploader"
uploadermocks "github.com/vmware-tanzu/velero/pkg/uploader/mocks"
@ -94,9 +95,10 @@ func TestSnapshotSource(t *testing.T) {
}
testCases := []struct {
name string
args []mockArgs
notError bool
name string
args []mockArgs
uploaderCfg shared.UploaderConfig
notError bool
}{
{
name: "regular test",
@ -150,6 +152,20 @@ func TestSnapshotSource(t *testing.T) {
},
notError: false,
},
{
name: "set policy with ParallelFilesUpload",
args: []mockArgs{
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}},
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
{methodName: "SetPolicy", returns: []interface{}{nil}},
{methodName: "Upload", returns: []interface{}{manifest, nil}},
{methodName: "Flush", returns: []interface{}{nil}},
},
uploaderCfg: shared.UploaderConfig{ParallelFilesUpload: 10},
notError: true,
},
{
name: "failed to upload snapshot",
args: []mockArgs{
@ -182,7 +198,7 @@ func TestSnapshotSource(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
s := injectSnapshotFuncs()
MockFuncs(s, tc.args)
_, _, err = SnapshotSource(ctx, s.repoWriterMock, s.uploderMock, sourceInfo, rootDir, false, "/", nil, log, "TestSnapshotSource")
_, _, err = SnapshotSource(ctx, s.repoWriterMock, s.uploderMock, sourceInfo, rootDir, false, "/", nil, tc.uploaderCfg, log, "TestSnapshotSource")
if tc.notError {
assert.NoError(t, err)
} else {
@ -630,9 +646,9 @@ func TestBackup(t *testing.T) {
var snapshotInfo *uploader.SnapshotInfo
var err error
if tc.isEmptyUploader {
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), nil, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.volMode, tc.tags, &logrus.Logger{})
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), nil, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.volMode, shared.UploaderConfig{}, tc.tags, &logrus.Logger{})
} else {
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), s.uploderMock, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.volMode, tc.tags, &logrus.Logger{})
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), s.uploderMock, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.volMode, shared.UploaderConfig{}, tc.tags, &logrus.Logger{})
}
// Check if the returned error matches the expected error
if tc.expectedError != nil {

View File

@ -30,6 +30,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/uploader/kopia"
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
repokeys "github.com/vmware-tanzu/velero/pkg/repository/keys"
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
@ -119,6 +120,7 @@ func (kp *kopiaProvider) RunBackup(
forceFull bool,
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
uploaderCfg shared.UploaderConfig,
updater uploader.ProgressUpdater) (string, bool, error) {
if updater == nil {
return "", false, errors.New("Need to initial backup progress updater first")
@ -158,7 +160,7 @@ func (kp *kopiaProvider) RunBackup(
realSource = fmt.Sprintf("%s/%s/%s", kp.requestorType, uploader.KopiaType, realSource)
}
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, tags, log)
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, uploaderCfg, tags, log)
if err != nil {
if kpUploader.IsCanceled() {
log.Error("Kopia backup is canceled")

View File

@ -34,6 +34,7 @@ import (
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/internal/credentials/mocks"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
"github.com/vmware-tanzu/velero/pkg/repository"
@ -68,34 +69,34 @@ func TestRunBackup(t *testing.T) {
testCases := []struct {
name string
hookBackupFunc func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error)
hookBackupFunc func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg shared.UploaderConfig, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error)
volMode uploader.PersistentVolumeMode
notError bool
}{
{
name: "success to backup",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg shared.UploaderConfig, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, nil
},
notError: true,
},
{
name: "get error to backup",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg shared.UploaderConfig, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, errors.New("failed to backup")
},
notError: false,
},
{
name: "got empty snapshot",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg shared.UploaderConfig, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return nil, true, errors.New("snapshot is empty")
},
notError: false,
},
{
name: "success to backup block mode volume",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg shared.UploaderConfig, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, nil
},
volMode: uploader.PersistentVolumeBlock,
@ -108,7 +109,7 @@ func TestRunBackup(t *testing.T) {
tc.volMode = uploader.PersistentVolumeFilesystem
}
BackupFunc = tc.hookBackupFunc
_, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, &updater)
_, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, shared.UploaderConfig{}, &updater)
if tc.notError {
assert.NoError(t, err)
} else {

View File

@ -7,6 +7,8 @@ import (
mock "github.com/stretchr/testify/mock"
shared "github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
uploader "github.com/vmware-tanzu/velero/pkg/uploader"
)
@ -29,30 +31,30 @@ func (_m *Provider) Close(ctx context.Context) error {
return r0
}
// RunBackup provides a mock function with given fields: ctx, path, realSource, tags, forceFull, parentSnapshot, updater
func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, updater uploader.ProgressUpdater) (string, bool, error) {
ret := _m.Called(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, updater)
// RunBackup provides a mock function with given fields: ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater
func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg shared.UploaderConfig, updater uploader.ProgressUpdater) (string, bool, error) {
ret := _m.Called(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater)
var r0 string
var r1 bool
var r2 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) (string, bool, error)); ok {
return rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, shared.UploaderConfig, uploader.ProgressUpdater) (string, bool, error)); ok {
return rf(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater)
}
if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) string); ok {
r0 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, shared.UploaderConfig, uploader.ProgressUpdater) string); ok {
r0 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater)
} else {
r0 = ret.Get(0).(string)
}
if rf, ok := ret.Get(1).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) bool); ok {
r1 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
if rf, ok := ret.Get(1).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, shared.UploaderConfig, uploader.ProgressUpdater) bool); ok {
r1 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater)
} else {
r1 = ret.Get(1).(bool)
}
if rf, ok := ret.Get(2).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) error); ok {
r2 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
if rf, ok := ret.Get(2).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, shared.UploaderConfig, uploader.ProgressUpdater) error); ok {
r2 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater)
} else {
r2 = ret.Error(2)
}
@ -60,13 +62,13 @@ func (_m *Provider) RunBackup(ctx context.Context, path string, realSource strin
return r0, r1, r2
}
// RunRestore provides a mock function with given fields: ctx, snapshotID, volumePath, updater
// RunRestore provides a mock function with given fields: ctx, snapshotID, volumePath, volMode, updater
func (_m *Provider) RunRestore(ctx context.Context, snapshotID string, volumePath string, volMode uploader.PersistentVolumeMode, updater uploader.ProgressUpdater) error {
ret := _m.Called(ctx, snapshotID, volumePath, volMode, updater)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, uploader.ProgressUpdater) error); ok {
r0 = rf(ctx, snapshotID, volumePath, updater)
if rf, ok := ret.Get(0).(func(context.Context, string, string, uploader.PersistentVolumeMode, uploader.ProgressUpdater) error); ok {
r0 = rf(ctx, snapshotID, volumePath, volMode, updater)
} else {
r0 = ret.Error(0)
}

View File

@ -28,6 +28,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/uploader"
)
@ -49,6 +50,7 @@ type Provider interface {
forceFull bool,
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
uploaderCfg shared.UploaderConfig,
updater uploader.ProgressUpdater) (string, bool, error)
// RunRestore which will do restore for one specific volume with given snapshot id and return error
// updater is used for updating backup progress which implement by third-party

View File

@ -27,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/uploader"
@ -122,6 +123,7 @@ func (rp *resticProvider) RunBackup(
forceFull bool,
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
uploaderCfg shared.UploaderConfig,
updater uploader.ProgressUpdater) (string, bool, error) {
if updater == nil {
return "", false, errors.New("Need to initial backup progress updater first")
@ -144,6 +146,10 @@ func (rp *resticProvider) RunBackup(
"parentSnapshot": parentSnapshot,
})
if uploaderCfg.ParallelFilesUpload > 0 {
log.Warnf("ParallelFilesUpload is set to %d, but restic does not support parallel file uploads. Ignoring.", uploaderCfg.ParallelFilesUpload)
}
backupCmd := resticBackupCMDFunc(rp.repoIdentifier, rp.credentialsFile, path, tags)
backupCmd.Env = rp.cmdEnv
backupCmd.CACertFile = rp.caCertFile

View File

@ -31,6 +31,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
@ -149,9 +150,9 @@ func TestResticRunBackup(t *testing.T) {
}
if !tc.nilUpdater {
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()}
_, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, &updater)
_, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, shared.UploaderConfig{}, &updater)
} else {
_, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, nil)
_, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, shared.UploaderConfig{}, nil)
}
tc.rp.log.Infof("test name %v error %v", tc.name, err)