Merge pull request #7512 from qiuming-best/support-parallel-restore
Make parallel restore configurablepull/7557/head
commit
365423d220
|
@ -0,0 +1,2 @@
|
|||
Make parallel restore configurable
|
||||
|
|
@ -422,6 +422,10 @@ spec:
|
|||
description: UploaderConfig specifies the configuration for the restore.
|
||||
nullable: true
|
||||
properties:
|
||||
parallelFilesDownload:
|
||||
description: ParallelFilesDownload is the concurrency number setting
|
||||
for restore.
|
||||
type: integer
|
||||
writeSparseFiles:
|
||||
description: WriteSparseFiles is a flag to indicate whether write
|
||||
files sparsely or not.
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -177,5 +177,54 @@ Roughly, the process is as follows:
|
|||
4. Each respective controller within the CRs calls the uploader, and the WriteSparseFiles from map in CRs is passed to the uploader.
|
||||
5. When the uploader subsequently calls the Kopia API, it can use the WriteSparseFiles to set the WriteSparseFiles parameter, and if the uploader calls the Restic command it would append `--sparse` flag within the restore command.
|
||||
|
||||
### Parallel Restore
|
||||
Setting the parallelism of restore operations can improve the efficiency and speed of the restore process, especially when dealing with large amounts of data.
|
||||
|
||||
### Velero CLI
|
||||
The Velero CLI will support a --parallel-files-download flag, allowing users to set the parallelism value when creating restores. when no value specified, the value of it would be the number of CPUs for the node that the node agent pod is running.
|
||||
```bash
|
||||
velero restore create --parallel-files-download $num
|
||||
```
|
||||
|
||||
### UploaderConfig
|
||||
below the sub-option parallel is added into UploaderConfig:
|
||||
|
||||
```go
|
||||
type UploaderConfigForRestore struct {
|
||||
// ParallelFilesDownload is the number of parallel for restore.
|
||||
// +optional
|
||||
ParallelFilesDownload int `json:"parallelFilesDownload,omitempty"`
|
||||
}
|
||||
```
|
||||
|
||||
#### Kopia Parallel Restore Policy
|
||||
|
||||
Velero Uploader can set restore policies when calling Kopia APIs. In the Kopia codebase, the structure for restore policies is defined as follows:
|
||||
|
||||
```go
|
||||
// first get concurrrency from uploader config
|
||||
restoreConcurrency, _ := uploaderutil.GetRestoreConcurrency(uploaderCfg)
|
||||
// set restore concurrency into restore options
|
||||
restoreOpt := restore.Options{
|
||||
Parallel: restoreConcurrency,
|
||||
}
|
||||
// do restore with restore option
|
||||
restore.Entry(..., restoreOpt)
|
||||
```
|
||||
|
||||
#### Restic Parallel Restore Policy
|
||||
|
||||
Configurable parallel restore is not supported by restic, so we would return one error if the option is configured.
|
||||
```go
|
||||
restoreConcurrency, err := uploaderutil.GetRestoreConcurrency(uploaderCfg)
|
||||
if err != nil {
|
||||
return extraFlags, errors.Wrap(err, "failed to get uploader config")
|
||||
}
|
||||
|
||||
if restoreConcurrency > 0 {
|
||||
return extraFlags, errors.New("restic does not support parallel restore")
|
||||
}
|
||||
```
|
||||
|
||||
## Alternatives Considered
|
||||
To enhance extensibility further, the option of storing `UploaderConfig` in a Kubernetes ConfigMap can be explored, this approach would allow the addition and modification of configuration options without the need to modify the CRD.
|
||||
|
|
|
@ -136,6 +136,9 @@ type UploaderConfigForRestore struct {
|
|||
// +optional
|
||||
// +nullable
|
||||
WriteSparseFiles *bool `json:"writeSparseFiles,omitempty"`
|
||||
// ParallelFilesDownload is the concurrency number setting for restore.
|
||||
// +optional
|
||||
ParallelFilesDownload int `json:"parallelFilesDownload,omitempty"`
|
||||
}
|
||||
|
||||
// RestoreHooks contains custom behaviors that should be executed during or post restore.
|
||||
|
|
|
@ -171,9 +171,3 @@ func (b *RestoreBuilder) ItemOperationTimeout(timeout time.Duration) *RestoreBui
|
|||
b.object.Spec.ItemOperationTimeout.Duration = timeout
|
||||
return b
|
||||
}
|
||||
|
||||
// WriteSparseFiles sets the Restore's uploader write sparse files
|
||||
func (b *RestoreBuilder) WriteSparseFiles(val bool) *RestoreBuilder {
|
||||
b.object.Spec.UploaderConfig.WriteSparseFiles = &val
|
||||
return b
|
||||
}
|
||||
|
|
|
@ -99,6 +99,7 @@ type CreateOptions struct {
|
|||
ItemOperationTimeout time.Duration
|
||||
ResourceModifierConfigMap string
|
||||
WriteSparseFiles flag.OptionalBool
|
||||
ParallelFilesDownload int
|
||||
client kbclient.WithWatch
|
||||
}
|
||||
|
||||
|
@ -151,6 +152,8 @@ func (o *CreateOptions) BindFlags(flags *pflag.FlagSet) {
|
|||
|
||||
f = flags.VarPF(&o.WriteSparseFiles, "write-sparse-files", "", "Whether to write sparse files during restoring volumes")
|
||||
f.NoOptDefVal = cmd.TRUE
|
||||
|
||||
flags.IntVar(&o.ParallelFilesDownload, "parallel-files-download", 0, "The number of restore operations to run in parallel. If set to 0, the default parallelism will be the number of CPUs for the node that node agent pod is running.")
|
||||
}
|
||||
|
||||
func (o *CreateOptions) Complete(args []string, f client.Factory) error {
|
||||
|
@ -200,6 +203,10 @@ func (o *CreateOptions) Validate(c *cobra.Command, args []string, f client.Facto
|
|||
return errors.New("existing-resource-policy has invalid value, it accepts only none, update as value")
|
||||
}
|
||||
|
||||
if o.ParallelFilesDownload < 0 {
|
||||
return errors.New("parallel-files-download cannot be negative")
|
||||
}
|
||||
|
||||
switch {
|
||||
case o.BackupName != "":
|
||||
backup := new(api.Backup)
|
||||
|
@ -324,7 +331,8 @@ func (o *CreateOptions) Run(c *cobra.Command, f client.Factory) error {
|
|||
Duration: o.ItemOperationTimeout,
|
||||
},
|
||||
UploaderConfig: &api.UploaderConfigForRestore{
|
||||
WriteSparseFiles: o.WriteSparseFiles.Value,
|
||||
WriteSparseFiles: o.WriteSparseFiles.Value,
|
||||
ParallelFilesDownload: o.ParallelFilesDownload,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -85,7 +85,7 @@ func TestCreateCommand(t *testing.T) {
|
|||
allowPartiallyFailed := "true"
|
||||
itemOperationTimeout := "10m0s"
|
||||
writeSparseFiles := "true"
|
||||
|
||||
parallel := 2
|
||||
flags := new(pflag.FlagSet)
|
||||
o := NewCreateOptions()
|
||||
o.BindFlags(flags)
|
||||
|
@ -108,6 +108,7 @@ func TestCreateCommand(t *testing.T) {
|
|||
flags.Parse([]string{"--allow-partially-failed", allowPartiallyFailed})
|
||||
flags.Parse([]string{"--item-operation-timeout", itemOperationTimeout})
|
||||
flags.Parse([]string{"--write-sparse-files", writeSparseFiles})
|
||||
flags.Parse([]string{"--parallel-files-download", "2"})
|
||||
client := velerotest.NewFakeControllerRuntimeClient(t).(kbclient.WithWatch)
|
||||
|
||||
f.On("Namespace").Return(mock.Anything)
|
||||
|
@ -144,6 +145,7 @@ func TestCreateCommand(t *testing.T) {
|
|||
require.Equal(t, allowPartiallyFailed, o.AllowPartiallyFailed.String())
|
||||
require.Equal(t, itemOperationTimeout, o.ItemOperationTimeout.String())
|
||||
require.Equal(t, writeSparseFiles, o.WriteSparseFiles.String())
|
||||
require.Equal(t, parallel, o.ParallelFilesDownload)
|
||||
})
|
||||
|
||||
t.Run("create a restore from schedule", func(t *testing.T) {
|
||||
|
|
|
@ -178,10 +178,7 @@ func DescribeRestore(ctx context.Context, kbClient kbclient.Client, restore *vel
|
|||
d.Println()
|
||||
d.Printf("Preserve Service NodePorts:\t%s\n", BoolPointerString(restore.Spec.PreserveNodePorts, "false", "true", "auto"))
|
||||
|
||||
if restore.Spec.UploaderConfig != nil && boolptr.IsSetToTrue(restore.Spec.UploaderConfig.WriteSparseFiles) {
|
||||
d.Println()
|
||||
DescribeUploaderConfigForRestore(d, restore.Spec)
|
||||
}
|
||||
describeUploaderConfigForRestore(d, restore.Spec)
|
||||
|
||||
d.Println()
|
||||
describeRestoreItemOperations(ctx, kbClient, d, restore, details, insecureSkipTLSVerify, caCertFile)
|
||||
|
@ -199,10 +196,18 @@ func DescribeRestore(ctx context.Context, kbClient kbclient.Client, restore *vel
|
|||
})
|
||||
}
|
||||
|
||||
// DescribeUploaderConfigForRestore describes uploader config in human-readable format
|
||||
func DescribeUploaderConfigForRestore(d *Describer, spec velerov1api.RestoreSpec) {
|
||||
d.Printf("Uploader config:\n")
|
||||
d.Printf("\tWrite Sparse Files:\t%T\n", boolptr.IsSetToTrue(spec.UploaderConfig.WriteSparseFiles))
|
||||
// describeUploaderConfigForRestore describes uploader config in human-readable format
|
||||
func describeUploaderConfigForRestore(d *Describer, spec velerov1api.RestoreSpec) {
|
||||
if spec.UploaderConfig != nil {
|
||||
d.Println()
|
||||
d.Printf("Uploader config:\n")
|
||||
if boolptr.IsSetToTrue(spec.UploaderConfig.WriteSparseFiles) {
|
||||
d.Printf("\tWrite Sparse Files:\t%v\n", boolptr.IsSetToTrue(spec.UploaderConfig.WriteSparseFiles))
|
||||
}
|
||||
if spec.UploaderConfig.ParallelFilesDownload > 0 {
|
||||
d.Printf("\tParallel Restore:\t%d\n", spec.UploaderConfig.ParallelFilesDownload)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func describeRestoreItemOperations(ctx context.Context, kbClient kbclient.Client, d *Describer, restore *velerov1api.Restore, details bool, insecureSkipTLSVerify bool, caCertPath string) {
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemoperation"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/results"
|
||||
)
|
||||
|
||||
|
@ -181,3 +182,58 @@ func TestDescribePodVolumeRestores(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
func TestDescribeUploaderConfigForRestore(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
spec velerov1api.RestoreSpec
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "UploaderConfigNil",
|
||||
spec: velerov1api.RestoreSpec{}, // Create a RestoreSpec with nil UploaderConfig
|
||||
expected: "",
|
||||
},
|
||||
{
|
||||
name: "test",
|
||||
spec: velerov1api.RestoreSpec{
|
||||
UploaderConfig: &velerov1api.UploaderConfigForRestore{
|
||||
WriteSparseFiles: boolptr.True(),
|
||||
ParallelFilesDownload: 4,
|
||||
},
|
||||
},
|
||||
expected: "\nUploader config:\n Write Sparse Files: true\n Parallel Restore: 4\n",
|
||||
},
|
||||
{
|
||||
name: "WriteSparseFiles test",
|
||||
spec: velerov1api.RestoreSpec{
|
||||
UploaderConfig: &velerov1api.UploaderConfigForRestore{
|
||||
WriteSparseFiles: boolptr.True(),
|
||||
},
|
||||
},
|
||||
expected: "\nUploader config:\n Write Sparse Files: true\n",
|
||||
},
|
||||
{
|
||||
name: "ParallelFilesDownload test",
|
||||
spec: velerov1api.RestoreSpec{
|
||||
UploaderConfig: &velerov1api.UploaderConfigForRestore{
|
||||
ParallelFilesDownload: 4,
|
||||
},
|
||||
},
|
||||
expected: "\nUploader config:\n Parallel Restore: 4\n",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
d := &Describer{
|
||||
Prefix: "",
|
||||
out: &tabwriter.Writer{},
|
||||
buf: &bytes.Buffer{},
|
||||
}
|
||||
d.out.Init(d.buf, 0, 8, 2, ' ', 0)
|
||||
describeUploaderConfigForRestore(d, tc.spec)
|
||||
d.out.Flush()
|
||||
assert.Equal(t, tc.expected, d.buf.String(), "Output should match expected")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -401,6 +401,8 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
|
|||
IgnorePermissionErrors: true,
|
||||
}
|
||||
|
||||
restoreConcurrency := runtime.NumCPU()
|
||||
|
||||
if len(uploaderCfg) > 0 {
|
||||
writeSparseFiles, err := uploaderutil.GetWriteSparseFiles(uploaderCfg)
|
||||
if err != nil {
|
||||
|
@ -409,9 +411,17 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
|
|||
if writeSparseFiles {
|
||||
fsOutput.WriteSparseFiles = true
|
||||
}
|
||||
|
||||
concurrency, err := uploaderutil.GetRestoreConcurrency(uploaderCfg)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "failed to get parallel restore uploader config")
|
||||
}
|
||||
if concurrency > 0 {
|
||||
restoreConcurrency = concurrency
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("Restore filesystem output %v", fsOutput)
|
||||
log.Debugf("Restore filesystem output %v, concurrency %d", fsOutput, restoreConcurrency)
|
||||
|
||||
err = fsOutput.Init(ctx)
|
||||
if err != nil {
|
||||
|
@ -426,7 +436,7 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
|
|||
}
|
||||
|
||||
stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{
|
||||
Parallel: runtime.NumCPU(),
|
||||
Parallel: restoreConcurrency,
|
||||
RestoreDirEntryAtDepth: math.MaxInt32,
|
||||
Cancel: cancleCh,
|
||||
ProgressCallback: func(ctx context.Context, stats restore.Stats) {
|
||||
|
|
|
@ -246,5 +246,9 @@ func (rp *resticProvider) parseRestoreExtraFlags(uploaderCfg map[string]string)
|
|||
extraFlags = append(extraFlags, "--sparse")
|
||||
}
|
||||
|
||||
if restoreConcurrency, err := uploaderutil.GetRestoreConcurrency(uploaderCfg); err == nil && restoreConcurrency > 0 {
|
||||
return extraFlags, errors.New("restic does not support parallel restore")
|
||||
}
|
||||
|
||||
return extraFlags, nil
|
||||
}
|
||||
|
|
|
@ -434,6 +434,13 @@ func TestParseUploaderConfig(t *testing.T) {
|
|||
},
|
||||
expectedFlags: []string{},
|
||||
},
|
||||
{
|
||||
name: "RestoreConcorrency",
|
||||
uploaderConfig: map[string]string{
|
||||
"Parallel": "5",
|
||||
},
|
||||
expectedFlags: []string{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
const (
|
||||
ParallelFilesUpload = "ParallelFilesUpload"
|
||||
WriteSparseFiles = "WriteSparseFiles"
|
||||
RestoreConcurrency = "ParallelFilesDownload"
|
||||
)
|
||||
|
||||
func StoreBackupConfig(config *velerov1api.UploaderConfigForBackup) map[string]string {
|
||||
|
@ -42,6 +43,10 @@ func StoreRestoreConfig(config *velerov1api.UploaderConfigForRestore) map[string
|
|||
} else {
|
||||
data[WriteSparseFiles] = strconv.FormatBool(false)
|
||||
}
|
||||
|
||||
if config.ParallelFilesDownload > 0 {
|
||||
data[RestoreConcurrency] = strconv.Itoa(config.ParallelFilesDownload)
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
|
@ -68,3 +73,15 @@ func GetWriteSparseFiles(uploaderCfg map[string]string) (bool, error) {
|
|||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func GetRestoreConcurrency(uploaderCfg map[string]string) (int, error) {
|
||||
restoreConcurrency, ok := uploaderCfg[RestoreConcurrency]
|
||||
if ok {
|
||||
restoreConcurrencyInt, err := strconv.Atoi(restoreConcurrency)
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "failed to parse RestoreConcurrency config")
|
||||
}
|
||||
return restoreConcurrencyInt, nil
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
|
|
@ -78,6 +78,16 @@ func TestStoreRestoreConfig(t *testing.T) {
|
|||
WriteSparseFiles: "false", // Assuming default value is false for nil case
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Parallel is set",
|
||||
config: &velerov1api.UploaderConfigForRestore{
|
||||
ParallelFilesDownload: 5,
|
||||
},
|
||||
expectedData: map[string]string{
|
||||
RestoreConcurrency: "5",
|
||||
WriteSparseFiles: "false",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
|
@ -180,3 +190,53 @@ func TestGetWriteSparseFiles(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetRestoreConcurrency(t *testing.T) {
|
||||
testCases := []struct {
|
||||
Name string
|
||||
UploaderCfg map[string]string
|
||||
ExpectedResult int
|
||||
ExpectedError bool
|
||||
ExpectedErrorMsg string
|
||||
}{
|
||||
{
|
||||
Name: "Valid Configuration",
|
||||
UploaderCfg: map[string]string{RestoreConcurrency: "10"},
|
||||
ExpectedResult: 10,
|
||||
ExpectedError: false,
|
||||
},
|
||||
{
|
||||
Name: "Missing Configuration",
|
||||
UploaderCfg: map[string]string{},
|
||||
ExpectedResult: 0,
|
||||
ExpectedError: false,
|
||||
},
|
||||
{
|
||||
Name: "Invalid Configuration",
|
||||
UploaderCfg: map[string]string{RestoreConcurrency: "not_an_integer"},
|
||||
ExpectedResult: 0,
|
||||
ExpectedError: true,
|
||||
ExpectedErrorMsg: "failed to parse RestoreConcurrency config: strconv.Atoi: parsing \"not_an_integer\": invalid syntax",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
result, err := GetRestoreConcurrency(tc.UploaderCfg)
|
||||
|
||||
if tc.ExpectedError {
|
||||
if err.Error() != tc.ExpectedErrorMsg {
|
||||
t.Errorf("Expected error message %s, but got %s", tc.ExpectedErrorMsg, err.Error())
|
||||
}
|
||||
} else {
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error, but got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if result != tc.ExpectedResult {
|
||||
t.Errorf("Expected result %d, but got %d", tc.ExpectedResult, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue