Merge branch 'main' into data-mover-ms-doc

pull/8144/head
Lyndon-Li 2024-08-29 16:14:19 +08:00
commit 866c2ab781
22 changed files with 155 additions and 39 deletions

View File

@ -20,4 +20,4 @@ jobs:
days-before-pr-close: -1
# Only issues made after Feb 09 2021.
start-date: "2021-09-02T00:00:00"
exempt-issue-labels: "Epic,Area/CLI,Area/Cloud/AWS,Area/Cloud/Azure,Area/Cloud/GCP,Area/Cloud/vSphere,Area/CSI,Area/Design,Area/Documentation,Area/Plugins,Bug,Enhancement/User,kind/requirement,kind/refactor,kind/tech-debt,limitation,Needs investigation,Needs triage,Needs Product,P0 - Hair on fire,P1 - Important,P2 - Long-term important,P3 - Wouldn't it be nice if...,Product Requirements,Restic - GA,Restic,release-blocker,Security"
exempt-issue-labels: "Epic,Area/CLI,Area/Cloud/AWS,Area/Cloud/Azure,Area/Cloud/GCP,Area/Cloud/vSphere,Area/CSI,Area/Design,Area/Documentation,Area/Plugins,Bug,Enhancement/User,kind/requirement,kind/refactor,kind/tech-debt,limitation,Needs investigation,Needs triage,Needs Product,P0 - Hair on fire,P1 - Important,P2 - Long-term important,P3 - Wouldn't it be nice if...,Product Requirements,Restic - GA,Restic,release-blocker,Security,backlog"

View File

@ -0,0 +1 @@
Descriptive restore error when restoring into a terminating namespace.

View File

@ -0,0 +1 @@
Add resource modifier for velero restore describe CLI

View File

@ -0,0 +1 @@
Fix issue #8134, allow to config resource request/limit for data mover micro service pods

View File

@ -0,0 +1 @@
Fix issue #8155, Merge Kopia upstream commits for critical issue fixes and performance improvements

View File

@ -176,6 +176,27 @@ Below diagram shows how VGDP logs are redirected:
This log redirecting mechanism is thread safe since the hook acquires the write lock before writing the log buffer, so it guarantees that in the node-agent log there is no corruptions after redirecting the log, and the redirected logs and the original node-agent logs are not projected into each other.
### Resource Control
The CPU/memory resource of backupPod/restorePod is configurable, which means users are allowed to configure resources per volume backup/restore.
By default, the [Best Effort policy][5] is used, and users are allowed to change it through the ```node-agent-config``` configMap. Specifically, we add below structures to the configMap:
```
type Configs struct {
// PodResources is the resource config for various types of pods launched by node-agent, i.e., data mover pods.
PodResources *PodResources `json:"podResources,omitempty"`
}
type PodResources struct {
CPURequest string `json:"cpuRequest,omitempty"`
MemoryRequest string `json:"memoryRequest,omitempty"`
CPULimit string `json:"cpuLimit,omitempty"`
MemoryLimit string `json:"memoryLimit,omitempty"`
}
```
The string values must mactch Kubernetes Quantity expressions; for each resource, the "request" value must not be larger than the "limit" value. Otherwise, if any one of the values fail, all the resource configurations will be ignored.
The configurations are loaded by node-agent at start time, so users can change the values in the configMap any time, but the changes won't effect until node-agent restarts.
## node-agent
node-agent is still required. Even though VGDP is now not running inside node-agent, node-agent still hosts the data mover controller which reconciles DUCR/DDCR and operates DUCR/DDCR in other steps before the VGDP instance is started, i.e., Accept, Expose, etc.
Privileged mode and root user are not required for node-agent anymore by Volume Snapshot Data Movement, however, they are still required by PVB(PodVolumeBackup) and PVR(PodVolumeRestore). Therefore, we will keep the node-agent deamonset as is, for any users who don't use PVB/PVR and have concern about the privileged mode/root user, they need to manually modify the deamonset spec to remove the dependencies.
@ -198,4 +219,5 @@ CLI is not changed.
[2]: ../volume-snapshot-data-movement/volume-snapshot-data-movement.md
[3]: https://kubernetes.io/blog/2022/09/02/cosi-kubernetes-object-storage-management/
[4]: ../Implemented/node-agent-concurrency.md
[5]: https://kubernetes.io/docs/concepts/workloads/pods/pod-qos/

2
go.mod
View File

@ -177,4 +177,4 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
)
replace github.com/kopia/kopia => github.com/project-velero/kopia v0.0.0-20240417031915-e07d5b7de567
replace github.com/kopia/kopia => github.com/project-velero/kopia v0.0.0-20240829032136-7fca59662a06

4
go.sum
View File

@ -613,8 +613,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021/go.mod h1:prYjPmNq4d1NPVmpShWobRqXY3q7Vp+80DqgxxUrUIA=
github.com/project-velero/kopia v0.0.0-20240417031915-e07d5b7de567 h1:Gb5eZktsqgnhOfQmKWIlVA9yKvschdr8n8d6y1RLFA0=
github.com/project-velero/kopia v0.0.0-20240417031915-e07d5b7de567/go.mod h1:2HlqZb/N6SNsWUCZzyeh9Lw29PeDRHDkMUiuQCEWt4Y=
github.com/project-velero/kopia v0.0.0-20240829032136-7fca59662a06 h1:QLtEHOokfqpsW99nDFyU2IB47LsGhDqMICGPA+ZgjpM=
github.com/project-velero/kopia v0.0.0-20240829032136-7fca59662a06/go.mod h1:2HlqZb/N6SNsWUCZzyeh9Lw29PeDRHDkMUiuQCEWt4Y=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=

View File

@ -60,6 +60,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
cacheutil "k8s.io/client-go/tools/cache"
@ -295,19 +296,31 @@ func (s *nodeAgentServer) run() {
var loadAffinity *nodeagent.LoadAffinity
if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 {
loadAffinity = s.dataPathConfigs.LoadAffinity[0]
s.logger.Infof("Using customized loadAffinity %v", loadAffinity)
}
var backupPVCConfig map[string]nodeagent.BackupPVC
if s.dataPathConfigs != nil && s.dataPathConfigs.BackupPVCConfig != nil {
backupPVCConfig = s.dataPathConfigs.BackupPVCConfig
s.logger.Infof("Using customized backupPVC config %v", backupPVCConfig)
}
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, backupPVCConfig, clock.RealClock{}, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
podResources := v1.ResourceRequirements{}
if s.dataPathConfigs != nil && s.dataPathConfigs.PodResources != nil {
if res, err := kube.ParseResourceRequirements(s.dataPathConfigs.PodResources.CPURequest, s.dataPathConfigs.PodResources.MemoryRequest, s.dataPathConfigs.PodResources.CPULimit, s.dataPathConfigs.PodResources.MemoryLimit); err != nil {
s.logger.WithError(err).Warn("Pod resource requirements are invalid, ignore")
} else {
podResources = res
s.logger.Infof("Using customized pod resource requirements %v", s.dataPathConfigs.PodResources)
}
}
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, backupPVCConfig, podResources, clock.RealClock{}, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, podResources, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}

View File

@ -27,6 +27,7 @@ import (
"github.com/vmware-tanzu/velero/internal/volume"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
@ -39,7 +40,15 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/results"
)
func DescribeRestore(ctx context.Context, kbClient kbclient.Client, restore *velerov1api.Restore, podVolumeRestores []velerov1api.PodVolumeRestore, details bool, insecureSkipTLSVerify bool, caCertFile string) string {
func DescribeRestore(
ctx context.Context,
kbClient kbclient.Client,
restore *velerov1api.Restore,
podVolumeRestores []velerov1api.PodVolumeRestore,
details bool,
insecureSkipTLSVerify bool,
caCertFile string,
) string {
return Describe(func(d *Describer) {
d.DescribeMetadata(restore.ObjectMeta)
@ -196,6 +205,11 @@ func DescribeRestore(ctx context.Context, kbClient kbclient.Client, restore *vel
d.Println()
d.Printf("Preserve Service NodePorts:\t%s\n", BoolPointerString(restore.Spec.PreserveNodePorts, "false", "true", "auto"))
if restore.Spec.ResourceModifier != nil {
d.Println()
DescribeResourceModifier(d, restore.Spec.ResourceModifier)
}
describeUploaderConfigForRestore(d, restore.Spec)
d.Println()
@ -472,3 +486,10 @@ func describeRestoreResourceList(ctx context.Context, kbClient kbclient.Client,
d.Printf("\t%s:\n\t\t- %s\n", gvk, strings.Join(resourceList[gvk], "\n\t\t- "))
}
}
// DescribeResourceModifier describes resource policies in human-readable format
func DescribeResourceModifier(d *Describer, resModifier *v1.TypedLocalObjectReference) {
d.Printf("Resource modifier:\n")
d.Printf("\tType:\t%s\n", resModifier.Kind)
d.Printf("\tName:\t%s\n", resModifier.Name)
}

View File

@ -2,15 +2,16 @@ package output
import (
"bytes"
"fmt"
"testing"
"text/tabwriter"
"time"
"github.com/vmware-tanzu/velero/internal/volume"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"github.com/vmware-tanzu/velero/internal/volume"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/itemoperation"
@ -389,3 +390,28 @@ CSI Snapshot Restores:
})
}
}
func TestDescribeResourceModifier(t *testing.T) {
d := &Describer{
Prefix: "",
out: &tabwriter.Writer{},
buf: &bytes.Buffer{},
}
d.out.Init(d.buf, 0, 8, 2, ' ', 0)
DescribeResourceModifier(d, &v1.TypedLocalObjectReference{
APIGroup: &v1.SchemeGroupVersion.Group,
Kind: "ConfigMap",
Name: "resourceModifier",
})
d.out.Flush()
expectOutput := `Resource modifier:
Type: ConfigMap
Name: resourceModifier
`
fmt.Println(d.buf.String())
require.Equal(t, expectOutput, d.buf.String())
}

View File

@ -60,12 +60,13 @@ type DataDownloadReconciler struct {
restoreExposer exposer.GenericRestoreExposer
nodeName string
dataPathMgr *datapath.Manager
podResources v1.ResourceRequirements
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
}
func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
podResources v1.ResourceRequirements, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
kubeClient: kubeClient,
@ -75,6 +76,7 @@ func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeCl
nodeName: nodeName,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
dataPathMgr: dataPathMgr,
podResources: podResources,
preparingTimeout: preparingTimeout,
metrics: metrics,
}
@ -179,7 +181,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
// And then only the controller who is in the same node could do the rest work.
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, dd.Spec.OperationTimeout.Duration)
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, r.podResources, dd.Spec.OperationTimeout.Duration)
if err != nil {
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
if !apierrors.IsNotFound(err) {

View File

@ -140,7 +140,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...
dataPathMgr := datapath.NewManager(1)
return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, corev1.ResourceRequirements{}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}
func TestDataDownloadReconcile(t *testing.T) {
@ -441,7 +441,7 @@ func TestDataDownloadReconcile(t *testing.T) {
r.restoreExposer = func() exposer.GenericRestoreExposer {
ep := exposermockes.NewGenericRestoreExposer(t)
if test.isExposeErr {
ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("Error to expose restore exposer"))
ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("Error to expose restore exposer"))
} else if test.notNilExpose {
hostingPod := builder.ForPod("test-ns", "test-name").Volumes(&corev1.Volume{Name: "test-pvc"}).Result()
hostingPod.ObjectMeta.SetUID("test-uid")
@ -959,7 +959,7 @@ func (dt *ddResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler,
return dt.resumeErr
}
func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, time.Duration) error {
func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error {
return nil
}

View File

@ -73,13 +73,14 @@ type DataUploadReconciler struct {
dataPathMgr *datapath.Manager
loadAffinity *nodeagent.LoadAffinity
backupPVCConfig map[string]nodeagent.BackupPVC
podResources corev1.ResourceRequirements
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
}
func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface,
dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, backupPVCConfig map[string]nodeagent.BackupPVC, clock clocks.WithTickerAndDelayedExecution,
nodeName string, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler {
dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, backupPVCConfig map[string]nodeagent.BackupPVC, podResources corev1.ResourceRequirements,
clock clocks.WithTickerAndDelayedExecution, nodeName string, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler {
return &DataUploadReconciler{
client: client,
mgr: mgr,
@ -92,6 +93,7 @@ func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClie
dataPathMgr: dataPathMgr,
loadAffinity: loadAffinity,
backupPVCConfig: backupPVCConfig,
podResources: podResources,
preparingTimeout: preparingTimeout,
metrics: metrics,
}
@ -795,6 +797,7 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload
VolumeSize: pvc.Spec.Resources.Requests[corev1.ResourceStorage],
Affinity: r.loadAffinity,
BackupPVCConfig: r.backupPVCConfig,
Resources: r.podResources,
}, nil
}
return nil, nil

View File

@ -232,7 +232,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
fakeKubeClient := clientgofake.NewSimpleClientset(daemonSet)
return NewDataUploadReconciler(fakeClient, nil, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, map[string]nodeagent.BackupPVC{},
testclocks.NewFakeClock(now), "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
corev1.ResourceRequirements{}, testclocks.NewFakeClock(now), "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}
func dataUploadBuilder() *builder.DataUploadBuilder {

View File

@ -70,6 +70,9 @@ type CSISnapshotExposeParam struct {
// BackupPVCConfig is the config for backupPVC (intermediate PVC) of snapshot data movement
BackupPVCConfig map[string]nodeagent.BackupPVC
// Resources defines the resource requirements of the hosting pod
Resources corev1.ResourceRequirements
}
// CSISnapshotExposeWaitParam define the input param for WaitExposed of CSI snapshots
@ -191,7 +194,7 @@ func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.Obje
}
}()
backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.OperationTimeout, csiExposeParam.HostingPodLabels, csiExposeParam.Affinity)
backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.OperationTimeout, csiExposeParam.HostingPodLabels, csiExposeParam.Affinity, csiExposeParam.Resources)
if err != nil {
return errors.Wrap(err, "error to create backup pod")
}
@ -423,7 +426,7 @@ func (e *csiSnapshotExposer) createBackupPVC(ctx context.Context, ownerObject co
}
func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject corev1.ObjectReference, backupPVC *corev1.PersistentVolumeClaim, operationTimeout time.Duration,
label map[string]string, affinity *nodeagent.LoadAffinity) (*corev1.Pod, error) {
label map[string]string, affinity *nodeagent.LoadAffinity, resources corev1.ResourceRequirements) (*corev1.Pod, error) {
podName := ownerObject.Name
containerName := string(ownerObject.UID)
@ -513,6 +516,7 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co
VolumeMounts: volumeMounts,
VolumeDevices: volumeDevices,
Env: podInfo.env,
Resources: resources,
},
},
ServiceAccountName: podInfo.serviceAccount,

View File

@ -37,7 +37,7 @@ import (
// GenericRestoreExposer is the interfaces for a generic restore exposer
type GenericRestoreExposer interface {
// Expose starts the process to a restore expose, the expose process may take long time
Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, time.Duration) error
Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error
// GetExposed polls the status of the expose.
// If the expose is accessible by the current caller, it waits the expose ready and returns the expose result.
@ -69,7 +69,7 @@ type genericRestoreExposer struct {
log logrus.FieldLogger
}
func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, hostingPodLabels map[string]string, timeout time.Duration) error {
func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, hostingPodLabels map[string]string, resources corev1.ResourceRequirements, timeout time.Duration) error {
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"target PVC": targetPVCName,
@ -87,7 +87,7 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.O
return errors.Errorf("Target PVC %s/%s has already been bound, abort", sourceNamespace, targetPVCName)
}
restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, timeout, hostingPodLabels, selectedNode)
restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, timeout, hostingPodLabels, selectedNode, resources)
if err != nil {
return errors.Wrapf(err, "error to create restore pod")
}
@ -297,7 +297,7 @@ func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject co
}
func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, targetPVC *corev1.PersistentVolumeClaim,
operationTimeout time.Duration, label map[string]string, selectedNode string) (*corev1.Pod, error) {
operationTimeout time.Duration, label map[string]string, selectedNode string, resources corev1.ResourceRequirements) (*corev1.Pod, error) {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name
@ -370,6 +370,7 @@ func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObjec
VolumeMounts: volumeMounts,
VolumeDevices: volumeDevices,
Env: podInfo.env,
Resources: resources,
},
},
ServiceAccountName: podInfo.serviceAccount,

View File

@ -180,7 +180,7 @@ func TestRestoreExpose(t *testing.T) {
}
}
err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, time.Millisecond)
err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, corev1.ResourceRequirements{}, time.Millisecond)
assert.EqualError(t, err, test.err)
})
}

View File

@ -26,17 +26,17 @@ func (_m *GenericRestoreExposer) CleanUp(_a0 context.Context, _a1 v1.ObjectRefer
_m.Called(_a0, _a1)
}
// Expose provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5
func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 map[string]string, _a5 time.Duration) error {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5)
// Expose provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5, _a6
func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 map[string]string, _a5 v1.ResourceRequirements, _a6 time.Duration) error {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6)
if len(ret) == 0 {
panic("no return value specified for Expose")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, map[string]string, time.Duration) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5)
if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, map[string]string, v1.ResourceRequirements, time.Duration) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6)
} else {
r0 = ret.Error(0)
}

View File

@ -70,6 +70,13 @@ type BackupPVC struct {
ReadOnly bool `json:"readOnly,omitempty"`
}
type PodResources struct {
CPURequest string `json:"cpuRequest,omitempty"`
MemoryRequest string `json:"memoryRequest,omitempty"`
CPULimit string `json:"cpuLimit,omitempty"`
MemoryLimit string `json:"memoryLimit,omitempty"`
}
type Configs struct {
// LoadConcurrency is the config for data path load concurrency per node.
LoadConcurrency *LoadConcurrency `json:"loadConcurrency,omitempty"`
@ -79,6 +86,9 @@ type Configs struct {
// BackupPVCConfig is the config for backupPVC (intermediate PVC) of snapshot data movement
BackupPVCConfig map[string]BackupPVC `json:"backupPVC,omitempty"`
// PodResources is the resource config for various types of pods launched by node-agent, i.e., data mover pods.
PodResources *PodResources `json:"podResources,omitempty"`
}
// IsRunning checks if the node agent daemonset is running properly. If not, return the error found

View File

@ -41,7 +41,7 @@ var _ restore.Output = &BlockOutput{}
const bufferSize = 128 * 1024
func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remoteFile fs.File) error {
func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remoteFile fs.File, progressCb restore.FileWriteProgress) error {
remoteReader, err := remoteFile.Open(ctx)
if err != nil {
return errors.Wrapf(err, "failed to open remote file %s", remoteFile.Name())
@ -70,6 +70,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
offset := 0
for bytesToWrite > 0 {
if bytesWritten, err := targetFile.Write(buffer[offset:bytesToWrite]); err == nil {
progressCb(int64(bytesWritten))
bytesToWrite -= bytesWritten
offset += bytesWritten
} else {

View File

@ -65,18 +65,23 @@ func NamespaceAndName(objMeta metav1.Object) string {
}
// EnsureNamespaceExistsAndIsReady attempts to create the provided Kubernetes namespace.
// It returns three values: a bool indicating whether or not the namespace is ready,
// a bool indicating whether or not the namespace was created and an error if the creation failed
// for a reason other than that the namespace already exists. Note that in the case where the
// namespace already exists and is not ready, this function will return (false, false, nil).
// If the namespace exists and is marked for deletion, this function will wait up to the timeout for it to fully delete.
func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client corev1client.NamespaceInterface, timeout time.Duration) (bool, bool, error) {
// It returns three values:
// - a bool indicating whether or not the namespace is ready,
// - a bool indicating whether or not the namespace was created
// - an error if one occurred.
//
// examples:
//
// namespace already exists and is not ready, this function will return (false, false, nil).
// If the namespace exists and is marked for deletion, this function will wait up to the timeout for it to fully delete.
func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client corev1client.NamespaceInterface, timeout time.Duration) (ready bool, nsCreated bool, err error) {
// nsCreated tells whether the namespace was created by this method
// required for keeping track of number of restored items
var nsCreated bool
var ready bool
err := wait.PollUntilContextTimeout(context.Background(), time.Second, timeout, true, func(ctx context.Context) (bool, error) {
// if namespace is marked for deletion, and we timed out, report an error
var terminatingNamespace bool
err = wait.PollUntilContextTimeout(context.Background(), time.Second, timeout, true, func(ctx context.Context) (bool, error) {
clusterNS, err := client.Get(ctx, namespace.Name, metav1.GetOptions{})
// if namespace is marked for deletion, and we timed out, report an error
if apierrors.IsNotFound(err) {
// Namespace isn't in cluster, we're good to create.
@ -90,6 +95,7 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core
if clusterNS != nil && (clusterNS.GetDeletionTimestamp() != nil || clusterNS.Status.Phase == corev1api.NamespaceTerminating) {
// Marked for deletion, keep waiting
terminatingNamespace = true
return false, nil
}
@ -100,6 +106,9 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core
// err will be set if we timed out or encountered issues retrieving the namespace,
if err != nil {
if terminatingNamespace {
return false, nsCreated, errors.Wrapf(err, "timed out waiting for terminating namespace %s to disappear before restoring", namespace.Name)
}
return false, nsCreated, errors.Wrapf(err, "error getting namespace %s", namespace.Name)
}