From b7d997130d2f173076d352e64040373591d7b600 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Thu, 29 May 2025 17:06:20 +0800 Subject: [PATCH 1/3] issue 8960: implement PodVolume exposer for PVB/PVR Signed-off-by: Lyndon-Li --- changelogs/unreleased/8985-Lyndon-Li | 1 + pkg/cmd/cli/nodeagent/server.go | 4 +- .../pod_volume_backup_controller.go | 7 +- .../pod_volume_restore_controller.go | 7 +- pkg/exposer/host_path.go | 31 +- pkg/exposer/host_path_test.go | 72 ++- pkg/exposer/pod_volume.go | 404 ++++++++++++ pkg/exposer/pod_volume_test.go | 590 ++++++++++++++++++ pkg/exposer/types.go | 12 +- pkg/nodeagent/node_agent.go | 40 ++ pkg/nodeagent/node_agent_test.go | 161 +++++ pkg/util/kube/utils.go | 28 +- pkg/util/kube/utils_test.go | 25 +- 13 files changed, 1333 insertions(+), 49 deletions(-) create mode 100644 changelogs/unreleased/8985-Lyndon-Li create mode 100644 pkg/exposer/pod_volume.go create mode 100644 pkg/exposer/pod_volume_test.go diff --git a/changelogs/unreleased/8985-Lyndon-Li b/changelogs/unreleased/8985-Lyndon-Li new file mode 100644 index 000000000..7bb89251c --- /dev/null +++ b/changelogs/unreleased/8985-Lyndon-Li @@ -0,0 +1 @@ +Fix issue #8960, implement PodVolume exposer for PVB/PVR \ No newline at end of file diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index c7a2576c9..0dca03665 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -305,14 +305,14 @@ func (s *nodeAgentServer) run() { credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore} repoEnsurer := repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout) - pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer, + pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.mgr.GetScheme(), s.metrics, s.logger) if err := pvbReconciler.SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerPodVolumeBackup) } - if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil { + if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 03cb3fe8e..254a39c91 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" clocks "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -48,10 +49,11 @@ import ( const pVBRRequestor string = "pod-volume-backup-restore" // NewPodVolumeBackupReconciler creates the PodVolumeBackupReconciler instance -func NewPodVolumeBackupReconciler(client client.Client, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, +func NewPodVolumeBackupReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, scheme *runtime.Scheme, metrics *metrics.ServerMetrics, logger logrus.FieldLogger) *PodVolumeBackupReconciler { return &PodVolumeBackupReconciler{ Client: client, + kubeClient: kubeClient, logger: logger.WithField("controller", "PodVolumeBackup"), repositoryEnsurer: ensurer, credentialGetter: credentialGetter, @@ -67,6 +69,7 @@ func NewPodVolumeBackupReconciler(client client.Client, dataPathMgr *datapath.Ma // PodVolumeBackupReconciler reconciles a PodVolumeBackup object type PodVolumeBackupReconciler struct { client.Client + kubeClient kubernetes.Interface scheme *runtime.Scheme clock clocks.WithTickerAndDelayedExecution metrics *metrics.ServerMetrics @@ -155,7 +158,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ return r.errorOut(ctx, &pvb, err, fmt.Sprintf("getting pod %s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name), log) } - path, err := exposer.GetPodVolumeHostPath(ctx, &pod, pvb.Spec.Volume, r.Client, r.fileSystem, log) + path, err := exposer.GetPodVolumeHostPath(ctx, &pod, pvb.Spec.Volume, r.kubeClient, r.fileSystem, log) if err != nil { r.closeDataPath(ctx, pvb.Name) return r.errorOut(ctx, &pvb, err, "error exposing host path for pod volume", log) diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index f4645657f..e65d1b606 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" clocks "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -49,10 +50,11 @@ import ( "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) -func NewPodVolumeRestoreReconciler(client client.Client, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, +func NewPodVolumeRestoreReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, logger logrus.FieldLogger) *PodVolumeRestoreReconciler { return &PodVolumeRestoreReconciler{ Client: client, + kubeClient: kubeClient, logger: logger.WithField("controller", "PodVolumeRestore"), repositoryEnsurer: ensurer, credentialGetter: credentialGetter, @@ -64,6 +66,7 @@ func NewPodVolumeRestoreReconciler(client client.Client, dataPathMgr *datapath.M type PodVolumeRestoreReconciler struct { client.Client + kubeClient kubernetes.Interface logger logrus.FieldLogger repositoryEnsurer *repository.Ensurer credentialGetter *credentials.CredentialGetter @@ -135,7 +138,7 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req return c.errorOut(ctx, pvr, err, "error to update status to in progress", log) } - volumePath, err := exposer.GetPodVolumeHostPath(ctx, pod, pvr.Spec.Volume, c.Client, c.fileSystem, log) + volumePath, err := exposer.GetPodVolumeHostPath(ctx, pod, pvr.Spec.Volume, c.kubeClient, c.fileSystem, log) if err != nil { c.closeDataPath(ctx, pvr.Name) return c.errorOut(ctx, pvr, err, "error exposing host path for pod volume", log) diff --git a/pkg/exposer/host_path.go b/pkg/exposer/host_path.go index d249dda39..a5668f8c0 100644 --- a/pkg/exposer/host_path.go +++ b/pkg/exposer/host_path.go @@ -19,13 +19,15 @@ package exposer import ( "context" "fmt" + "strings" "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1api "k8s.io/api/core/v1" - ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/client-go/kubernetes" "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" @@ -37,17 +39,17 @@ var singlePathMatch = kube.SinglePathMatch // GetPodVolumeHostPath returns a path that can be accessed from the host for a given volume of a pod func GetPodVolumeHostPath(ctx context.Context, pod *corev1api.Pod, volumeName string, - cli ctrlclient.Client, fs filesystem.Interface, log logrus.FieldLogger) (datapath.AccessPoint, error) { + kubeClient kubernetes.Interface, fs filesystem.Interface, log logrus.FieldLogger) (datapath.AccessPoint, error) { logger := log.WithField("pod name", pod.Name).WithField("pod UID", pod.GetUID()).WithField("volume", volumeName) - volDir, err := getVolumeDirectory(ctx, logger, pod, volumeName, cli) + volDir, err := getVolumeDirectory(ctx, logger, pod, volumeName, kubeClient) if err != nil { return datapath.AccessPoint{}, errors.Wrapf(err, "error getting volume directory name for volume %s in pod %s", volumeName, pod.Name) } logger.WithField("volDir", volDir).Info("Got volume dir") - volMode, err := getVolumeMode(ctx, logger, pod, volumeName, cli) + volMode, err := getVolumeMode(ctx, logger, pod, volumeName, kubeClient) if err != nil { return datapath.AccessPoint{}, errors.Wrapf(err, "error getting volume mode for volume %s in pod %s", volumeName, pod.Name) } @@ -57,7 +59,7 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1api.Pod, volumeName st volSubDir = "volumeDevices" } - pathGlob := fmt.Sprintf("/host_pods/%s/%s/*/%s", string(pod.GetUID()), volSubDir, volDir) + pathGlob := fmt.Sprintf("/%s/%s/%s/*/%s", nodeagent.HostPodVolumeMountPoint, string(pod.GetUID()), volSubDir, volDir) logger.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob") path, err := singlePathMatch(pathGlob, fs, logger) @@ -72,3 +74,22 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1api.Pod, volumeName st VolMode: volMode, }, nil } + +var getHostPodPath = nodeagent.GetHostPodPath + +func ExtractPodVolumeHostPath(ctx context.Context, path string, kubeClient kubernetes.Interface, veleroNamespace string, osType string) (string, error) { + podPath, err := getHostPodPath(ctx, kubeClient, veleroNamespace, osType) + if err != nil { + return "", errors.Wrap(err, "error getting host pod path from node-agent") + } + + if osType == kube.NodeOSWindows { + podPath = strings.Replace(podPath, "/", "\\", -1) + } + + if osType == kube.NodeOSWindows { + return strings.Replace(path, "\\"+nodeagent.HostPodVolumeMountPoint, podPath, 1), nil + } else { + return strings.Replace(path, "/"+nodeagent.HostPodVolumeMountPoint, podPath, 1), nil + } +} diff --git a/pkg/exposer/host_path_test.go b/pkg/exposer/host_path_test.go index 9faff5f14..411833f64 100644 --- a/pkg/exposer/host_path_test.go +++ b/pkg/exposer/host_path_test.go @@ -18,26 +18,29 @@ package exposer import ( "context" + "fmt" "testing" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" corev1api "k8s.io/api/core/v1" - ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/client-go/kubernetes" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/nodeagent" velerotest "github.com/vmware-tanzu/velero/pkg/test" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util/kube" ) func TestGetPodVolumeHostPath(t *testing.T) { tests := []struct { name string - getVolumeDirFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (string, error) - getVolumeModeFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (uploader.PersistentVolumeMode, error) + getVolumeDirFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (string, error) + getVolumeModeFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (uploader.PersistentVolumeMode, error) pathMatchFunc func(string, filesystem.Interface, logrus.FieldLogger) (string, error) pod *corev1api.Pod pvc string @@ -45,7 +48,7 @@ func TestGetPodVolumeHostPath(t *testing.T) { }{ { name: "get volume dir fail", - getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (string, error) { + getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (string, error) { return "", errors.New("fake-error-1") }, pod: builder.ForPod(velerov1api.DefaultNamespace, "fake-pod-1").Result(), @@ -54,10 +57,10 @@ func TestGetPodVolumeHostPath(t *testing.T) { }, { name: "single path match fail", - getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (string, error) { + getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (string, error) { return "", nil }, - getVolumeModeFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (uploader.PersistentVolumeMode, error) { + getVolumeModeFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (uploader.PersistentVolumeMode, error) { return uploader.PersistentVolumeFilesystem, nil }, pathMatchFunc: func(string, filesystem.Interface, logrus.FieldLogger) (string, error) { @@ -69,7 +72,7 @@ func TestGetPodVolumeHostPath(t *testing.T) { }, { name: "get block volume dir success", - getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) ( + getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) ( string, error) { return "fake-pvc-1", nil }, @@ -102,3 +105,58 @@ func TestGetPodVolumeHostPath(t *testing.T) { }) } } + +func TestExtractPodVolumeHostPath(t *testing.T) { + tests := []struct { + name string + getHostPodPathFunc func(context.Context, kubernetes.Interface, string, string) (string, error) + path string + osType string + expectedErr string + expected string + }{ + { + name: "get host pod path error", + getHostPodPathFunc: func(context.Context, kubernetes.Interface, string, string) (string, error) { + return "", errors.New("fake-error-1") + }, + + expectedErr: "error getting host pod path from node-agent: fake-error-1", + }, + { + name: "Windows os", + getHostPodPathFunc: func(context.Context, kubernetes.Interface, string, string) (string, error) { + return "/var/lib/kubelet/pods", nil + }, + path: fmt.Sprintf("\\%s\\pod-id-xxx\\volumes\\kubernetes.io~csi\\pvc-id-xxx\\mount", nodeagent.HostPodVolumeMountPoint), + osType: kube.NodeOSWindows, + expected: "\\var\\lib\\kubelet\\pods\\pod-id-xxx\\volumes\\kubernetes.io~csi\\pvc-id-xxx\\mount", + }, + { + name: "linux OS", + getHostPodPathFunc: func(context.Context, kubernetes.Interface, string, string) (string, error) { + return "/var/lib/kubelet/pods", nil + }, + path: fmt.Sprintf("/%s/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nodeagent.HostPodVolumeMountPoint), + osType: kube.NodeOSLinux, + expected: "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.getHostPodPathFunc != nil { + getHostPodPath = test.getHostPodPathFunc + } + + path, err := ExtractPodVolumeHostPath(context.Background(), test.path, nil, "", test.osType) + + if test.expectedErr != "" { + assert.EqualError(t, err, test.expectedErr) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expected, path) + } + }) + } +} diff --git a/pkg/exposer/pod_volume.go b/pkg/exposer/pod_volume.go new file mode 100644 index 000000000..6e3910d12 --- /dev/null +++ b/pkg/exposer/pod_volume.go @@ -0,0 +1,404 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exposer + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vmware-tanzu/velero/pkg/nodeagent" + "github.com/vmware-tanzu/velero/pkg/util/boolptr" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util/kube" +) + +const ( + PodVolumeExposeTypeBackup = "pod-volume-backup" + PodVolumeExposeTypeRestore = "pod-volume-restore" +) + +// PodVolumeExposeParam define the input param for pod volume Expose +type PodVolumeExposeParam struct { + // ClientPodName is the name of pod to be backed up or restored + ClientPodName string + + // ClientNamespace is the namespace to be backed up or restored + ClientNamespace string + + // ClientNamespace is the pod volume for the client PVC + ClientPodVolume string + + // HostingPodLabels is the labels that are going to apply to the hosting pod + HostingPodLabels map[string]string + + // HostingPodAnnotations is the annotations that are going to apply to the hosting pod + HostingPodAnnotations map[string]string + + // Resources defines the resource requirements of the hosting pod + Resources corev1.ResourceRequirements + + // OperationTimeout specifies the time wait for resources operations in Expose + OperationTimeout time.Duration + + // Type specifies the type of the expose, either backup or erstore + Type string +} + +// PodVolumeExposer is the interfaces for a pod volume exposer +type PodVolumeExposer interface { + // Expose starts the process to a pod volume expose, the expose process may take long time + Expose(context.Context, corev1.ObjectReference, PodVolumeExposeParam) error + + // GetExposed polls the status of the expose. + // If the expose is accessible by the current caller, it waits the expose ready and returns the expose result. + // Otherwise, it returns nil as the expose result without an error. + GetExposed(context.Context, corev1.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error) + + // PeekExposed tests the status of the expose. + // If the expose is incomplete but not recoverable, it returns an error. + // Otherwise, it returns nil immediately. + PeekExposed(context.Context, corev1.ObjectReference) error + + // DiagnoseExpose generate the diagnostic info when the expose is not finished for a long time. + // If it finds any problem, it returns an string about the problem. + DiagnoseExpose(context.Context, corev1.ObjectReference) string + + // CleanUp cleans up any objects generated during the restore expose + CleanUp(context.Context, corev1.ObjectReference) +} + +// NewPodVolumeExposer creates a new instance of pod volume exposer +func NewPodVolumeExposer(kubeClient kubernetes.Interface, log logrus.FieldLogger) PodVolumeExposer { + return &podVolumeExposer{ + kubeClient: kubeClient, + fs: filesystem.NewFileSystem(), + log: log, + } +} + +type podVolumeExposer struct { + kubeClient kubernetes.Interface + fs filesystem.Interface + log logrus.FieldLogger +} + +var getPodVolumeHostPath = GetPodVolumeHostPath +var extractPodVolumeHostPath = ExtractPodVolumeHostPath + +func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, param PodVolumeExposeParam) error { + curLog := e.log.WithFields(logrus.Fields{ + "owner": ownerObject.Name, + "client pod": param.ClientPodName, + "client pod volume": param.ClientPodVolume, + "client namespace": param.ClientNamespace, + "type": param.Type, + }) + + pod, err := e.kubeClient.CoreV1().Pods(param.ClientNamespace).Get(ctx, param.ClientPodName, metav1.GetOptions{}) + if err != nil { + return errors.Wrapf(err, "error getting client pod %s", param.ClientPodName) + } + + if pod.Spec.NodeName == "" { + return errors.Errorf("client pod %s doesn't have a node name", pod.Name) + } + + nodeOS, err := kube.GetNodeOS(ctx, pod.Spec.NodeName, e.kubeClient.CoreV1()) + if err != nil { + return errors.Wrapf(err, "error getting OS for node %s", pod.Spec.NodeName) + } + + curLog.Infof("Client pod is running in node %s, os %s", pod.Spec.NodeName, nodeOS) + + path, err := getPodVolumeHostPath(ctx, pod, param.ClientPodVolume, e.kubeClient, e.fs, e.log) + if err != nil { + return errors.Wrapf(err, "error to get pod volume path") + } + + path.ByPath, err = extractPodVolumeHostPath(ctx, path.ByPath, e.kubeClient, ownerObject.Namespace, nodeOS) + if err != nil { + return errors.Wrapf(err, "error to extract pod volume path") + } + + curLog.WithField("path", path).Infof("Host path is retrieved for pod %s, volume %s", param.ClientPodName, param.ClientPodVolume) + + hostingPod, err := e.createHostingPod(ctx, ownerObject, param.Type, path.ByPath, param.OperationTimeout, param.HostingPodLabels, param.HostingPodAnnotations, pod.Spec.NodeName, param.Resources, nodeOS) + if err != nil { + return errors.Wrapf(err, "error to create hosting pod") + } + + defer func() { + if err != nil { + kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), hostingPod.Name, hostingPod.Namespace, curLog) + } + }() + + curLog.WithField("pod name", hostingPod.Name).Info("Hosting pod is created") + + return nil +} + +func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1.ObjectReference, nodeClient client.Client, nodeName string, timeout time.Duration) (*ExposeResult, error) { + hostingPodName := ownerObject.Name + + containerName := string(ownerObject.UID) + volumeName := string(ownerObject.UID) + + curLog := e.log.WithFields(logrus.Fields{ + "owner": ownerObject.Name, + "node": nodeName, + }) + + var updated *corev1.Pod + err := wait.PollUntilContextTimeout(ctx, 2*time.Second, timeout, true, func(ctx context.Context) (bool, error) { + pod := &corev1.Pod{} + err := nodeClient.Get(ctx, types.NamespacedName{ + Namespace: ownerObject.Namespace, + Name: hostingPodName, + }, pod) + + if err != nil { + return false, errors.Wrapf(err, "error to get pod %s/%s", ownerObject.Namespace, hostingPodName) + } + + if pod.Status.Phase != corev1.PodRunning { + return false, nil + } + + updated = pod + + return true, nil + }) + + if err != nil { + if apierrors.IsNotFound(err) { + curLog.WithField("hosting pod", hostingPodName).Debug("Hosting pod is not running in the current node") + return nil, nil + } else { + return nil, errors.Wrapf(err, "error to wait for rediness of pod %s", hostingPodName) + } + } + + curLog.WithField("pod", updated.Name).Infof("Hosting pod is in running state in node %s", updated.Spec.NodeName) + + return &ExposeResult{ByPod: ExposeByPod{ + HostingPod: updated, + HostingContainer: containerName, + VolumeName: volumeName, + }}, nil +} + +func (e *podVolumeExposer) PeekExposed(ctx context.Context, ownerObject corev1.ObjectReference) error { + hostingPodName := ownerObject.Name + + curLog := e.log.WithFields(logrus.Fields{ + "owner": ownerObject.Name, + }) + + pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, hostingPodName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return nil + } + + if err != nil { + curLog.WithError(err).Warnf("error to peek hosting pod %s", hostingPodName) + return nil + } + + if podFailed, message := kube.IsPodUnrecoverable(pod, curLog); podFailed { + return errors.New(message) + } + + return nil +} + +func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev1.ObjectReference) string { + hostingPodName := ownerObject.Name + + diag := "begin diagnose pod volume exposer\n" + + pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, hostingPodName, metav1.GetOptions{}) + if err != nil { + pod = nil + diag += fmt.Sprintf("error getting hosting pod %s, err: %v\n", hostingPodName, err) + } + + if pod != nil { + diag += kube.DiagnosePod(pod) + + if pod.Spec.NodeName != "" { + if err := nodeagent.KbClientIsRunningInNode(ctx, ownerObject.Namespace, pod.Spec.NodeName, e.kubeClient); err != nil { + diag += fmt.Sprintf("node-agent is not running in node %s, err: %v\n", pod.Spec.NodeName, err) + } + } + } + + diag += "end diagnose pod volume exposer" + + return diag +} + +func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1.ObjectReference) { + restorePodName := ownerObject.Name + kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log) +} + +func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1.ObjectReference, exposeType string, hostPath string, + operationTimeout time.Duration, label map[string]string, annotation map[string]string, selectedNode string, resources corev1.ResourceRequirements, nodeOS string) (*corev1.Pod, error) { + hostingPodName := ownerObject.Name + + containerName := string(ownerObject.UID) + clientVolumeName := string(ownerObject.UID) + clientVolumePath := "/" + clientVolumeName + + podInfo, err := getInheritedPodInfo(ctx, e.kubeClient, ownerObject.Namespace, nodeOS) + if err != nil { + return nil, errors.Wrap(err, "error to get inherited pod info from node-agent") + } + + var gracePeriod int64 + mountPropagation := corev1.MountPropagationHostToContainer + volumeMounts := []corev1.VolumeMount{{ + Name: clientVolumeName, + MountPath: clientVolumePath, + MountPropagation: &mountPropagation, + }} + volumeMounts = append(volumeMounts, podInfo.volumeMounts...) + + volumes := []corev1.Volume{{ + Name: clientVolumeName, + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: hostPath, + }, + }, + }} + volumes = append(volumes, podInfo.volumes...) + + if label == nil { + label = make(map[string]string) + } + + args := []string{ + fmt.Sprintf("--volume-path=%s", clientVolumePath), + fmt.Sprintf("--resource-timeout=%s", operationTimeout.String()), + } + + command := []string{ + "/velero", + "pod-volume", + } + + if exposeType == PodVolumeExposeTypeBackup { + args = append(args, fmt.Sprintf("--pod-volume-backup=%s", ownerObject.Name)) + command = append(command, "backup") + label[podGroupLabel] = podGroupPodVolumeBackup + } else { + args = append(args, fmt.Sprintf("--pod-volume-restore=%s", ownerObject.Name)) + command = append(command, "restore") + label[podGroupLabel] = podGroupPodVolumeRestore + } + + args = append(args, podInfo.logFormatArgs...) + args = append(args, podInfo.logLevelArgs...) + + var securityCtx *corev1.PodSecurityContext + nodeSelector := map[string]string{} + podOS := corev1.PodOS{} + toleration := []corev1.Toleration{} + if nodeOS == kube.NodeOSWindows { + userID := "ContainerAdministrator" + securityCtx = &corev1.PodSecurityContext{ + WindowsOptions: &corev1.WindowsSecurityContextOptions{ + RunAsUserName: &userID, + }, + } + + nodeSelector[kube.NodeOSLabel] = kube.NodeOSWindows + podOS.Name = kube.NodeOSWindows + + toleration = append(toleration, corev1.Toleration{ + Key: "os", + Operator: "Equal", + Effect: "NoSchedule", + Value: "windows", + }) + } else { + userID := int64(0) + securityCtx = &corev1.PodSecurityContext{ + RunAsUser: &userID, + } + + nodeSelector[kube.NodeOSLabel] = kube.NodeOSLinux + podOS.Name = kube.NodeOSLinux + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: hostingPodName, + Namespace: ownerObject.Namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: ownerObject.APIVersion, + Kind: ownerObject.Kind, + Name: ownerObject.Name, + UID: ownerObject.UID, + Controller: boolptr.True(), + }, + }, + Labels: label, + Annotations: annotation, + }, + Spec: corev1.PodSpec{ + NodeSelector: nodeSelector, + OS: &podOS, + Containers: []corev1.Container{ + { + Name: containerName, + Image: podInfo.image, + ImagePullPolicy: corev1.PullNever, + Command: command, + Args: args, + VolumeMounts: volumeMounts, + Env: podInfo.env, + EnvFrom: podInfo.envFrom, + Resources: resources, + }, + }, + ServiceAccountName: podInfo.serviceAccount, + TerminationGracePeriodSeconds: &gracePeriod, + Volumes: volumes, + NodeName: selectedNode, + RestartPolicy: corev1.RestartPolicyNever, + SecurityContext: securityCtx, + Tolerations: toleration, + }, + } + + return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{}) +} diff --git a/pkg/exposer/pod_volume_test.go b/pkg/exposer/pod_volume_test.go new file mode 100644 index 000000000..8f4abab67 --- /dev/null +++ b/pkg/exposer/pod_volume_test.go @@ -0,0 +1,590 @@ +package exposer + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/datapath" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" + appsv1api "k8s.io/api/apps/v1" + corev1api "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestPodVolumeExpose(t *testing.T) { + backup := &velerov1.Backup{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1.SchemeGroupVersion.String(), + Kind: "Backup", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup", + UID: "fake-uid", + }, + } + + podWithNoNode := builder.ForPod("fake-ns", "fake-client-pod").Result() + podWithNode := builder.ForPod("fake-ns", "fake-client-pod").NodeName("fake-node").Result() + + node := builder.ForNode("fake-node").Result() + + daemonSet := &appsv1api.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + APIVersion: appsv1api.SchemeGroupVersion.String(), + }, + Spec: appsv1api.DaemonSetSpec{ + Template: corev1api.PodTemplateSpec{ + Spec: corev1api.PodSpec{ + Containers: []corev1api.Container{ + { + Name: "node-agent", + }, + }, + }, + }, + }, + } + + tests := []struct { + name string + snapshotClientObj []runtime.Object + kubeClientObj []runtime.Object + ownerBackup *velerov1.Backup + exposeParam PodVolumeExposeParam + funcGetPodVolumeHostPath func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) + funcExtractPodVolumeHostPath func(context.Context, string, kubernetes.Interface, string, string) (string, error) + err string + }{ + { + name: "get client pod fail", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + }, + err: "error getting client pod fake-client-pod: pods \"fake-client-pod\" not found", + }, + { + name: "client pod with no node name", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + }, + kubeClientObj: []runtime.Object{ + podWithNoNode, + }, + err: "client pod fake-client-pod doesn't have a node name", + }, + { + name: "get node os fail", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + }, + kubeClientObj: []runtime.Object{ + podWithNode, + }, + err: "error getting OS for node fake-node: error getting node fake-node: nodes \"fake-node\" not found", + }, + { + name: "get pod volume path fail", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + ClientPodVolume: "fake-client-volume", + }, + kubeClientObj: []runtime.Object{ + podWithNode, + node, + }, + funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) { + return datapath.AccessPoint{}, errors.New("fake-get-pod-volume-path-error") + }, + err: "error to get pod volume path: fake-get-pod-volume-path-error", + }, + { + name: "extract pod volume path fail", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + ClientPodVolume: "fake-client-volume", + }, + kubeClientObj: []runtime.Object{ + podWithNode, + node, + }, + funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) { + return datapath.AccessPoint{ + ByPath: "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", + }, nil + }, + funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) { + return "", errors.New("fake-extract-error") + }, + err: "error to extract pod volume path: fake-extract-error", + }, + { + name: "create hosting pod fail", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + ClientPodVolume: "fake-client-volume", + }, + kubeClientObj: []runtime.Object{ + podWithNode, + node, + }, + funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) { + return datapath.AccessPoint{ + ByPath: "/host_pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", + }, nil + }, + funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) { + return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil + }, + err: "error to create hosting pod: error to get inherited pod info from node-agent: error to get node-agent pod template: error to get node-agent daemonset: daemonsets.apps \"node-agent\" not found", + }, + { + name: "succeed", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + ClientPodVolume: "fake-client-volume", + }, + kubeClientObj: []runtime.Object{ + podWithNode, + node, + daemonSet, + }, + funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) { + return datapath.AccessPoint{ + ByPath: "/host_pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", + }, nil + }, + funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) { + return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + exposer := podVolumeExposer{ + kubeClient: fakeKubeClient, + log: velerotest.NewLogger(), + } + + var ownerObject corev1api.ObjectReference + if test.ownerBackup != nil { + ownerObject = corev1api.ObjectReference{ + Kind: test.ownerBackup.Kind, + Namespace: test.ownerBackup.Namespace, + Name: test.ownerBackup.Name, + UID: test.ownerBackup.UID, + APIVersion: test.ownerBackup.APIVersion, + } + } + + if test.funcGetPodVolumeHostPath != nil { + getPodVolumeHostPath = test.funcGetPodVolumeHostPath + } + + if test.funcExtractPodVolumeHostPath != nil { + extractPodVolumeHostPath = test.funcExtractPodVolumeHostPath + } + + err := exposer.Expose(context.Background(), ownerObject, test.exposeParam) + if err == nil { + assert.NoError(t, err) + + _, err = exposer.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(context.Background(), ownerObject.Name, metav1.GetOptions{}) + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.err) + } + }) + } +} + +func TestGetPodVolumeExpose(t *testing.T) { + backup := &velerov1.Backup{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1.SchemeGroupVersion.String(), + Kind: "Backup", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup", + UID: "fake-uid", + }, + } + + backupPodNotRunning := builder.ForPod(backup.Namespace, backup.Name).Result() + backupPodRunning := builder.ForPod(backup.Namespace, backup.Name).Phase(corev1api.PodRunning).Result() + + scheme := runtime.NewScheme() + corev1api.AddToScheme(scheme) + + tests := []struct { + name string + kubeClientObj []runtime.Object + ownerBackup *velerov1.Backup + nodeName string + Timeout time.Duration + err string + expectedResult *ExposeResult + }{ + { + name: "backup pod is not found", + ownerBackup: backup, + nodeName: "fake-node", + }, + { + name: "wait backup pod running fail", + ownerBackup: backup, + nodeName: "fake-node", + kubeClientObj: []runtime.Object{ + backupPodNotRunning, + }, + Timeout: time.Second, + err: "error to wait for rediness of pod fake-backup: context deadline exceeded", + }, + { + name: "succeed", + ownerBackup: backup, + nodeName: "fake-node", + kubeClientObj: []runtime.Object{ + backupPodRunning, + }, + Timeout: time.Second, + expectedResult: &ExposeResult{ + ByPod: ExposeByPod{ + HostingPod: backupPodRunning, + VolumeName: string(backup.UID), + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + fakeClientBuilder := clientFake.NewClientBuilder() + fakeClientBuilder = fakeClientBuilder.WithScheme(scheme) + + fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build() + + exposer := podVolumeExposer{ + kubeClient: fakeKubeClient, + log: velerotest.NewLogger(), + } + + var ownerObject corev1api.ObjectReference + if test.ownerBackup != nil { + ownerObject = corev1api.ObjectReference{ + Kind: test.ownerBackup.Kind, + Namespace: test.ownerBackup.Namespace, + Name: test.ownerBackup.Name, + UID: test.ownerBackup.UID, + APIVersion: test.ownerBackup.APIVersion, + } + } + + result, err := exposer.GetExposed(context.Background(), ownerObject, fakeClient, test.nodeName, test.Timeout) + if test.err == "" { + assert.NoError(t, err) + + if test.expectedResult == nil { + assert.Nil(t, result) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expectedResult.ByPod.VolumeName, result.ByPod.VolumeName) + assert.Equal(t, test.expectedResult.ByPod.HostingPod.Name, result.ByPod.HostingPod.Name) + } + } else { + assert.EqualError(t, err, test.err) + } + }) + } +} + +func TestPodVolumePeekExpose(t *testing.T) { + backup := &velerov1.Backup{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1.SchemeGroupVersion.String(), + Kind: "Backup", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup", + UID: "fake-uid", + }, + } + + backupPodUrecoverable := &corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: backup.Namespace, + Name: backup.Name, + }, + Status: corev1api.PodStatus{ + Phase: corev1api.PodFailed, + }, + } + + backupPod := &corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: backup.Namespace, + Name: backup.Name, + }, + } + + scheme := runtime.NewScheme() + corev1api.AddToScheme(scheme) + + tests := []struct { + name string + kubeClientObj []runtime.Object + ownerBackup *velerov1.Backup + err string + }{ + { + name: "backup pod is not found", + ownerBackup: backup, + }, + { + name: "pod is unrecoverable", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + backupPodUrecoverable, + }, + err: "Pod is in abnormal state [Failed], message []", + }, + { + name: "succeed", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + backupPod, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + exposer := podVolumeExposer{ + kubeClient: fakeKubeClient, + log: velerotest.NewLogger(), + } + + var ownerObject corev1api.ObjectReference + if test.ownerBackup != nil { + ownerObject = corev1api.ObjectReference{ + Kind: test.ownerBackup.Kind, + Namespace: test.ownerBackup.Namespace, + Name: test.ownerBackup.Name, + UID: test.ownerBackup.UID, + APIVersion: test.ownerBackup.APIVersion, + } + } + + err := exposer.PeekExposed(context.Background(), ownerObject) + if test.err == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.err) + } + }) + } +} + +func TestPodVolumeDiagnoseExpose(t *testing.T) { + backup := &velerov1.Backup{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1.SchemeGroupVersion.String(), + Kind: "Backup", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup", + UID: "fake-uid", + }, + } + + backupPodWithoutNodeName := corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: backup.APIVersion, + Kind: backup.Kind, + Name: backup.Name, + UID: backup.UID, + }, + }, + }, + Status: corev1api.PodStatus{ + Phase: corev1api.PodPending, + Conditions: []corev1api.PodCondition{ + { + Type: corev1api.PodInitialized, + Status: corev1api.ConditionTrue, + Message: "fake-pod-message", + }, + }, + }, + } + + backupPodWithNodeName := corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: backup.APIVersion, + Kind: backup.Kind, + Name: backup.Name, + UID: backup.UID, + }, + }, + }, + Spec: corev1api.PodSpec{ + NodeName: "fake-node", + }, + Status: corev1api.PodStatus{ + Phase: corev1api.PodPending, + Conditions: []corev1api.PodCondition{ + { + Type: corev1api.PodInitialized, + Status: corev1api.ConditionTrue, + Message: "fake-pod-message", + }, + }, + }, + } + + nodeAgentPod := corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "node-agent-pod-1", + Labels: map[string]string{"role": "node-agent"}, + }, + Spec: corev1api.PodSpec{ + NodeName: "fake-node", + }, + Status: corev1api.PodStatus{ + Phase: corev1api.PodRunning, + }, + } + + tests := []struct { + name string + ownerBackup *velerov1.Backup + kubeClientObj []runtime.Object + snapshotClientObj []runtime.Object + expected string + }{ + { + name: "no pod", + ownerBackup: backup, + expected: `begin diagnose pod volume exposer +error getting hosting pod fake-backup, err: pods "fake-backup" not found +end diagnose pod volume exposer`, + }, + { + name: "pod without node name, pvc without volume name, vs without status", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + &backupPodWithoutNodeName, + }, + expected: `begin diagnose pod volume exposer +Pod velero/fake-backup, phase Pending, node name +Pod condition Initialized, status True, reason , message fake-pod-message +end diagnose pod volume exposer`, + }, + { + name: "pod without node name", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + &backupPodWithoutNodeName, + }, + expected: `begin diagnose pod volume exposer +Pod velero/fake-backup, phase Pending, node name +Pod condition Initialized, status True, reason , message fake-pod-message +end diagnose pod volume exposer`, + }, + { + name: "pod with node name, no node agent", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + &backupPodWithNodeName, + }, + expected: `begin diagnose pod volume exposer +Pod velero/fake-backup, phase Pending, node name fake-node +Pod condition Initialized, status True, reason , message fake-pod-message +node-agent is not running in node fake-node, err: daemonset pod not found in running state in node fake-node +end diagnose pod volume exposer`, + }, + { + name: "pod with node name, node agent is running", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + &backupPodWithNodeName, + &nodeAgentPod, + }, + expected: `begin diagnose pod volume exposer +Pod velero/fake-backup, phase Pending, node name fake-node +Pod condition Initialized, status True, reason , message fake-pod-message +end diagnose pod volume exposer`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(tt.kubeClientObj...) + e := &podVolumeExposer{ + kubeClient: fakeKubeClient, + log: velerotest.NewLogger(), + } + var ownerObject corev1api.ObjectReference + if tt.ownerBackup != nil { + ownerObject = corev1api.ObjectReference{ + Kind: tt.ownerBackup.Kind, + Namespace: tt.ownerBackup.Namespace, + Name: tt.ownerBackup.Name, + UID: tt.ownerBackup.UID, + APIVersion: tt.ownerBackup.APIVersion, + } + } + + diag := e.DiagnoseExpose(context.Background(), ownerObject) + assert.Equal(t, tt.expected, diag) + }) + } +} diff --git a/pkg/exposer/types.go b/pkg/exposer/types.go index 5cdb9d497..b8f875a88 100644 --- a/pkg/exposer/types.go +++ b/pkg/exposer/types.go @@ -21,11 +21,13 @@ import ( ) const ( - AccessModeFileSystem = "by-file-system" - AccessModeBlock = "by-block-device" - podGroupLabel = "velero.io/exposer-pod-group" - podGroupSnapshot = "snapshot-exposer" - podGroupGenericRestore = "generic-restore-exposer" + AccessModeFileSystem = "by-file-system" + AccessModeBlock = "by-block-device" + podGroupLabel = "velero.io/exposer-pod-group" + podGroupSnapshot = "snapshot-exposer" + podGroupGenericRestore = "generic-restore-exposer" + podGroupPodVolumeBackup = "pod-volume-backup" + podGroupPodVolumeRestore = "pod-volume-restore" ) // ExposeResult defines the result of expose. diff --git a/pkg/nodeagent/node_agent.go b/pkg/nodeagent/node_agent.go index 34b782cbc..1c0378901 100644 --- a/pkg/nodeagent/node_agent.go +++ b/pkg/nodeagent/node_agent.go @@ -41,6 +41,12 @@ const ( // nodeAgentRole marks pods with node-agent role on all nodes. nodeAgentRole = "node-agent" + + // hostPodVolume is the name of the volume in node-agent for host-pod mount + hostPodVolume = "host-pods" + + // HostPodVolumeMountPoint is the mount point of the volume in node-agent for host-pod mount + HostPodVolumeMountPoint = "host_pods" ) var ( @@ -249,3 +255,37 @@ func GetAnnotationValue(ctx context.Context, kubeClient kubernetes.Interface, na return val, nil } + +func GetHostPodPath(ctx context.Context, kubeClient kubernetes.Interface, namespace string, osType string) (string, error) { + dsName := daemonSet + if osType == kube.NodeOSWindows { + dsName = daemonsetWindows + } + + ds, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, dsName, metav1.GetOptions{}) + if err != nil { + return "", errors.Wrapf(err, "error getting daemonset %s", dsName) + } + + var volume *corev1api.Volume + for _, v := range ds.Spec.Template.Spec.Volumes { + if v.Name == hostPodVolume { + volume = &v + break + } + } + + if volume == nil { + return "", errors.New("host pod volume is not found") + } + + if volume.HostPath == nil { + return "", errors.New("host pod volume is not a host path volume") + } + + if volume.HostPath.Path == "" { + return "", errors.New("host pod volume path is empty") + } + + return volume.HostPath.Path, nil +} diff --git a/pkg/nodeagent/node_agent_test.go b/pkg/nodeagent/node_agent_test.go index 9a78ca033..118f36ecf 100644 --- a/pkg/nodeagent/node_agent_test.go +++ b/pkg/nodeagent/node_agent_test.go @@ -590,3 +590,164 @@ func TestGetAnnotationValue(t *testing.T) { }) } } + +func TestGetHostPodPath(t *testing.T) { + daemonSet := &appsv1api.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + }, + } + + daemonSetWithHostPodVolume := &appsv1api.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + }, + Spec: appsv1api.DaemonSetSpec{ + Template: corev1api.PodTemplateSpec{ + Spec: corev1api.PodSpec{ + Volumes: []corev1api.Volume{ + { + Name: hostPodVolume, + }, + }, + }, + }, + }, + } + + daemonSetWithHostPodVolumeAndEmptyPath := &appsv1api.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + }, + Spec: appsv1api.DaemonSetSpec{ + Template: corev1api.PodTemplateSpec{ + Spec: corev1api.PodSpec{ + Volumes: []corev1api.Volume{ + { + Name: hostPodVolume, + VolumeSource: corev1api.VolumeSource{ + HostPath: &corev1api.HostPathVolumeSource{}, + }, + }, + }, + }, + }, + }, + } + + daemonSetWithHostPodVolumeAndValidPath := &appsv1api.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + }, + Spec: appsv1api.DaemonSetSpec{ + Template: corev1api.PodTemplateSpec{ + Spec: corev1api.PodSpec{ + Volumes: []corev1api.Volume{ + { + Name: hostPodVolume, + VolumeSource: corev1api.VolumeSource{ + HostPath: &corev1api.HostPathVolumeSource{ + Path: "/var/lib/kubelet/pods", + }, + }, + }, + }, + }, + }, + }, + } + + tests := []struct { + name string + kubeClientObj []runtime.Object + namespace string + osType string + expectedValue string + expectErr string + }{ + { + name: "ds get error", + namespace: "fake-ns", + osType: kube.NodeOSWindows, + kubeClientObj: []runtime.Object{ + daemonSet, + }, + expectErr: "error getting daemonset node-agent-windows: daemonsets.apps \"node-agent-windows\" not found", + }, + { + name: "no host pod volume", + namespace: "fake-ns", + osType: kube.NodeOSLinux, + kubeClientObj: []runtime.Object{ + daemonSet, + }, + expectErr: "host pod volume is not found", + }, + { + name: "no host pod volume path", + namespace: "fake-ns", + osType: kube.NodeOSLinux, + kubeClientObj: []runtime.Object{ + daemonSetWithHostPodVolume, + }, + expectErr: "host pod volume is not a host path volume", + }, + { + name: "empty host pod volume path", + namespace: "fake-ns", + osType: kube.NodeOSLinux, + kubeClientObj: []runtime.Object{ + daemonSetWithHostPodVolumeAndEmptyPath, + }, + expectErr: "host pod volume path is empty", + }, + { + name: "succeed", + namespace: "fake-ns", + osType: kube.NodeOSLinux, + kubeClientObj: []runtime.Object{ + daemonSetWithHostPodVolumeAndValidPath, + }, + expectedValue: "/var/lib/kubelet/pods", + }, + { + name: "succeed on empty os type", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + daemonSetWithHostPodVolumeAndValidPath, + }, + expectedValue: "/var/lib/kubelet/pods", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + path, err := GetHostPodPath(context.TODO(), fakeKubeClient, test.namespace, test.osType) + + if test.expectErr == "" { + assert.NoError(t, err) + assert.Equal(t, test.expectedValue, path) + } else { + assert.EqualError(t, err, test.expectErr) + } + }) + } +} diff --git a/pkg/util/kube/utils.go b/pkg/util/kube/utils.go index 5d64f117b..eab3193fc 100644 --- a/pkg/util/kube/utils.go +++ b/pkg/util/kube/utils.go @@ -25,7 +25,6 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1api "k8s.io/api/core/v1" - storagev1api "k8s.io/api/storage/v1" apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -33,8 +32,8 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - "sigs.k8s.io/controller-runtime/pkg/client" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/label" @@ -148,8 +147,8 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core // GetVolumeDirectory gets the name of the directory on the host, under /var/lib/kubelet/pods//volumes/, // where the specified volume lives. // For volumes with a CSIVolumeSource, append "/mount" to the directory name. -func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (string, error) { - pvc, pv, volume, err := GetPodPVCVolume(ctx, log, pod, volumeName, cli) +func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, kubeClient kubernetes.Interface) (string, error) { + pvc, pv, volume, err := GetPodPVCVolume(ctx, log, pod, volumeName, kubeClient) if err != nil { // This case implies the administrator created the PV and attached it directly, without PVC. // Note that only one VolumeSource can be populated per Volume on a pod @@ -164,7 +163,7 @@ func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1 // Most common case is that we have a PVC VolumeSource, and we need to check the PV it points to for a CSI source. // PV's been created with a CSI source. - isProvisionedByCSI, err := isProvisionedByCSI(log, pv, cli) + isProvisionedByCSI, err := isProvisionedByCSI(log, pv, kubeClient) if err != nil { return "", errors.WithStack(err) } @@ -179,9 +178,9 @@ func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1 } // GetVolumeMode gets the uploader.PersistentVolumeMode of the volume. -func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) ( +func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, kubeClient kubernetes.Interface) ( uploader.PersistentVolumeMode, error) { - _, pv, _, err := GetPodPVCVolume(ctx, log, pod, volumeName, cli) + _, pv, _, err := GetPodPVCVolume(ctx, log, pod, volumeName, kubeClient) if err != nil { if err == ErrorPodVolumeIsNotPVC { @@ -198,7 +197,7 @@ func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.P // GetPodPVCVolume gets the PVC, PV and volume for a pod volume name. // Returns pod volume in case of ErrorPodVolumeIsNotPVC error -func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) ( +func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, kubeClient kubernetes.Interface) ( *corev1api.PersistentVolumeClaim, *corev1api.PersistentVolume, *corev1api.Volume, error) { var volume *corev1api.Volume @@ -217,14 +216,12 @@ func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api return nil, nil, volume, ErrorPodVolumeIsNotPVC // There is a pod volume but it is not a PVC } - pvc := &corev1api.PersistentVolumeClaim{} - err := cli.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: volume.VolumeSource.PersistentVolumeClaim.ClaimName}, pvc) + pvc, err := kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(ctx, volume.VolumeSource.PersistentVolumeClaim.ClaimName, metav1.GetOptions{}) if err != nil { return nil, nil, nil, errors.WithStack(err) } - pv := &corev1api.PersistentVolume{} - err = cli.Get(ctx, client.ObjectKey{Name: pvc.Spec.VolumeName}, pv) + pv, err := kubeClient.CoreV1().PersistentVolumes().Get(ctx, pvc.Spec.VolumeName, metav1.GetOptions{}) if err != nil { return nil, nil, nil, errors.WithStack(err) } @@ -235,7 +232,7 @@ func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api // isProvisionedByCSI function checks whether this is a CSI PV by annotation. // Either "pv.kubernetes.io/provisioned-by" or "pv.kubernetes.io/migrated-to" indicates // PV is provisioned by CSI. -func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume, kbClient client.Client) (bool, error) { +func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume, kubeClient kubernetes.Interface) (bool, error) { if pv.Spec.CSI != nil { return true, nil } @@ -245,10 +242,11 @@ func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume, driverName := pv.Annotations[KubeAnnDynamicallyProvisioned] migratedDriver := pv.Annotations[KubeAnnMigratedTo] if len(driverName) > 0 || len(migratedDriver) > 0 { - list := &storagev1api.CSIDriverList{} - if err := kbClient.List(context.TODO(), list); err != nil { + list, err := kubeClient.StorageV1().CSIDrivers().List(context.TODO(), metav1.ListOptions{}) + if err != nil { return false, err } + for _, driver := range list.Items { if driverName == driver.Name || migratedDriver == driver.Name { log.Debugf("the annotation %s or %s equals to %s indicates the volume is provisioned by a CSI driver", KubeAnnDynamicallyProvisioned, KubeAnnMigratedTo, driver.Name) diff --git a/pkg/util/kube/utils_test.go b/pkg/util/kube/utils_test.go index b36ab8ef2..37f990778 100644 --- a/pkg/util/kube/utils_test.go +++ b/pkg/util/kube/utils_test.go @@ -34,11 +34,12 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/controller-runtime/pkg/client/fake" "github.com/vmware-tanzu/velero/pkg/builder" velerotest "github.com/vmware-tanzu/velero/pkg/test" "github.com/vmware-tanzu/velero/pkg/uploader" + + "k8s.io/client-go/kubernetes/fake" ) func TestNamespaceAndName(t *testing.T) { @@ -216,17 +217,18 @@ func TestGetVolumeDirectorySuccess(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "csi.test.com"}, } for _, tc := range tests { - clientBuilder := fake.NewClientBuilder().WithLists(&storagev1api.CSIDriverList{Items: []storagev1api.CSIDriver{csiDriver}}) - + objs := []runtime.Object{&csiDriver} if tc.pvc != nil { - clientBuilder = clientBuilder.WithObjects(tc.pvc) + objs = append(objs, tc.pvc) } if tc.pv != nil { - clientBuilder = clientBuilder.WithObjects(tc.pv) + objs = append(objs, tc.pv) } + fakeKubeClient := fake.NewSimpleClientset(objs...) + // Function under test - dir, err := GetVolumeDirectory(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, clientBuilder.Build()) + dir, err := GetVolumeDirectory(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, fakeKubeClient) require.NoError(t, err) assert.Equal(t, tc.want, dir) @@ -264,17 +266,18 @@ func TestGetVolumeModeSuccess(t *testing.T) { } for _, tc := range tests { - clientBuilder := fake.NewClientBuilder() - + objs := []runtime.Object{} if tc.pvc != nil { - clientBuilder = clientBuilder.WithObjects(tc.pvc) + objs = append(objs, tc.pvc) } if tc.pv != nil { - clientBuilder = clientBuilder.WithObjects(tc.pv) + objs = append(objs, tc.pv) } + fakeKubeClient := fake.NewSimpleClientset(objs...) + // Function under test - mode, err := GetVolumeMode(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, clientBuilder.Build()) + mode, err := GetVolumeMode(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, fakeKubeClient) require.NoError(t, err) assert.Equal(t, tc.want, mode) From d903e9eda75b46b12a1a3d7ac95815167045c8d1 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Thu, 29 May 2025 17:15:29 +0800 Subject: [PATCH 2/3] issue 8960: implement PodVolume exposer for PVB/PVR Signed-off-by: Lyndon-Li --- pkg/cmd/cli/nodeagent/server.go | 8 ++- pkg/cmd/cli/nodeagent/server_test.go | 2 +- pkg/exposer/host_path.go | 6 +-- pkg/exposer/pod_volume.go | 74 +++++++++++++--------------- pkg/exposer/pod_volume_test.go | 11 +++-- pkg/exposer/types.go | 12 ++--- pkg/install/daemonset.go | 5 +- pkg/nodeagent/node_agent.go | 14 ++++-- pkg/nodeagent/node_agent_test.go | 6 +-- 9 files changed, 69 insertions(+), 69 deletions(-) diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 0dca03665..bf43d18e1 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -80,8 +80,6 @@ const ( // files will be written to defaultCredentialsDirectory = "/tmp/credentials" - defaultHostPodsPath = "/host_pods" - defaultResourceTimeout = 10 * time.Minute defaultDataMoverPrepareTimeout = 30 * time.Minute defaultDataPathConcurrentNum = 1 @@ -416,10 +414,10 @@ func (s *nodeAgentServer) waitCacheForResume() error { // validatePodVolumesHostPath validates that the pod volumes path contains a // directory for each Pod running on this node func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface) error { - files, err := s.fileSystem.ReadDir(defaultHostPodsPath) + files, err := s.fileSystem.ReadDir(nodeagent.HostPodVolumeMountPath()) if err != nil { if errors.Is(err, os.ErrNotExist) { - s.logger.Warnf("Pod volumes host path [%s] doesn't exist, fs-backup is disabled", defaultHostPodsPath) + s.logger.Warnf("Pod volumes host path [%s] doesn't exist, fs-backup is disabled", nodeagent.HostPodVolumeMountPath()) return nil } return errors.Wrap(err, "could not read pod volumes host path") @@ -452,7 +450,7 @@ func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface valid = false s.logger.WithFields(logrus.Fields{ "pod": fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName()), - "path": defaultHostPodsPath + "/" + dirName, + "path": nodeagent.HostPodVolumeMountPath() + "/" + dirName, }).Debug("could not find volumes for pod in host path") } } diff --git a/pkg/cmd/cli/nodeagent/server_test.go b/pkg/cmd/cli/nodeagent/server_test.go index 3d0290a70..2551bfac3 100644 --- a/pkg/cmd/cli/nodeagent/server_test.go +++ b/pkg/cmd/cli/nodeagent/server_test.go @@ -99,7 +99,7 @@ func Test_validatePodVolumesHostPath(t *testing.T) { for _, dir := range tt.dirs { if tt.createDir { - err := fs.MkdirAll(filepath.Join(defaultHostPodsPath, dir), os.ModePerm) + err := fs.MkdirAll(filepath.Join(nodeagent.HostPodVolumeMountPath(), dir), os.ModePerm) if err != nil { t.Error(err) } diff --git a/pkg/exposer/host_path.go b/pkg/exposer/host_path.go index a5668f8c0..e51178711 100644 --- a/pkg/exposer/host_path.go +++ b/pkg/exposer/host_path.go @@ -59,7 +59,7 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1api.Pod, volumeName st volSubDir = "volumeDevices" } - pathGlob := fmt.Sprintf("/%s/%s/%s/*/%s", nodeagent.HostPodVolumeMountPoint, string(pod.GetUID()), volSubDir, volDir) + pathGlob := fmt.Sprintf("%s/%s/%s/*/%s", nodeagent.HostPodVolumeMountPath(), string(pod.GetUID()), volSubDir, volDir) logger.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob") path, err := singlePathMatch(pathGlob, fs, logger) @@ -88,8 +88,8 @@ func ExtractPodVolumeHostPath(ctx context.Context, path string, kubeClient kuber } if osType == kube.NodeOSWindows { - return strings.Replace(path, "\\"+nodeagent.HostPodVolumeMountPoint, podPath, 1), nil + return strings.Replace(path, nodeagent.HostPodVolumeMountPathWin(), podPath, 1), nil } else { - return strings.Replace(path, "/"+nodeagent.HostPodVolumeMountPoint, podPath, 1), nil + return strings.Replace(path, nodeagent.HostPodVolumeMountPath(), podPath, 1), nil } } diff --git a/pkg/exposer/pod_volume.go b/pkg/exposer/pod_volume.go index 6e3910d12..6a62705d6 100644 --- a/pkg/exposer/pod_volume.go +++ b/pkg/exposer/pod_volume.go @@ -23,7 +23,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" + corev1api "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -60,7 +60,7 @@ type PodVolumeExposeParam struct { HostingPodAnnotations map[string]string // Resources defines the resource requirements of the hosting pod - Resources corev1.ResourceRequirements + Resources corev1api.ResourceRequirements // OperationTimeout specifies the time wait for resources operations in Expose OperationTimeout time.Duration @@ -72,24 +72,24 @@ type PodVolumeExposeParam struct { // PodVolumeExposer is the interfaces for a pod volume exposer type PodVolumeExposer interface { // Expose starts the process to a pod volume expose, the expose process may take long time - Expose(context.Context, corev1.ObjectReference, PodVolumeExposeParam) error + Expose(context.Context, corev1api.ObjectReference, PodVolumeExposeParam) error // GetExposed polls the status of the expose. // If the expose is accessible by the current caller, it waits the expose ready and returns the expose result. // Otherwise, it returns nil as the expose result without an error. - GetExposed(context.Context, corev1.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error) + GetExposed(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error) // PeekExposed tests the status of the expose. // If the expose is incomplete but not recoverable, it returns an error. // Otherwise, it returns nil immediately. - PeekExposed(context.Context, corev1.ObjectReference) error + PeekExposed(context.Context, corev1api.ObjectReference) error // DiagnoseExpose generate the diagnostic info when the expose is not finished for a long time. // If it finds any problem, it returns an string about the problem. - DiagnoseExpose(context.Context, corev1.ObjectReference) string + DiagnoseExpose(context.Context, corev1api.ObjectReference) string // CleanUp cleans up any objects generated during the restore expose - CleanUp(context.Context, corev1.ObjectReference) + CleanUp(context.Context, corev1api.ObjectReference) } // NewPodVolumeExposer creates a new instance of pod volume exposer @@ -110,7 +110,7 @@ type podVolumeExposer struct { var getPodVolumeHostPath = GetPodVolumeHostPath var extractPodVolumeHostPath = ExtractPodVolumeHostPath -func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, param PodVolumeExposeParam) error { +func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1api.ObjectReference, param PodVolumeExposeParam) error { curLog := e.log.WithFields(logrus.Fields{ "owner": ownerObject.Name, "client pod": param.ClientPodName, @@ -163,7 +163,7 @@ func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1.Object return nil } -func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1.ObjectReference, nodeClient client.Client, nodeName string, timeout time.Duration) (*ExposeResult, error) { +func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1api.ObjectReference, nodeClient client.Client, nodeName string, timeout time.Duration) (*ExposeResult, error) { hostingPodName := ownerObject.Name containerName := string(ownerObject.UID) @@ -174,9 +174,9 @@ func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1.Ob "node": nodeName, }) - var updated *corev1.Pod + var updated *corev1api.Pod err := wait.PollUntilContextTimeout(ctx, 2*time.Second, timeout, true, func(ctx context.Context) (bool, error) { - pod := &corev1.Pod{} + pod := &corev1api.Pod{} err := nodeClient.Get(ctx, types.NamespacedName{ Namespace: ownerObject.Namespace, Name: hostingPodName, @@ -186,7 +186,7 @@ func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1.Ob return false, errors.Wrapf(err, "error to get pod %s/%s", ownerObject.Namespace, hostingPodName) } - if pod.Status.Phase != corev1.PodRunning { + if pod.Status.Phase != corev1api.PodRunning { return false, nil } @@ -213,7 +213,7 @@ func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1.Ob }}, nil } -func (e *podVolumeExposer) PeekExposed(ctx context.Context, ownerObject corev1.ObjectReference) error { +func (e *podVolumeExposer) PeekExposed(ctx context.Context, ownerObject corev1api.ObjectReference) error { hostingPodName := ownerObject.Name curLog := e.log.WithFields(logrus.Fields{ @@ -237,7 +237,7 @@ func (e *podVolumeExposer) PeekExposed(ctx context.Context, ownerObject corev1.O return nil } -func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev1.ObjectReference) string { +func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev1api.ObjectReference) string { hostingPodName := ownerObject.Name diag := "begin diagnose pod volume exposer\n" @@ -263,13 +263,13 @@ func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev return diag } -func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1.ObjectReference) { +func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) { restorePodName := ownerObject.Name kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log) } -func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1.ObjectReference, exposeType string, hostPath string, - operationTimeout time.Duration, label map[string]string, annotation map[string]string, selectedNode string, resources corev1.ResourceRequirements, nodeOS string) (*corev1.Pod, error) { +func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1api.ObjectReference, exposeType string, hostPath string, + operationTimeout time.Duration, label map[string]string, annotation map[string]string, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string) (*corev1api.Pod, error) { hostingPodName := ownerObject.Name containerName := string(ownerObject.UID) @@ -282,28 +282,24 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor } var gracePeriod int64 - mountPropagation := corev1.MountPropagationHostToContainer - volumeMounts := []corev1.VolumeMount{{ + mountPropagation := corev1api.MountPropagationHostToContainer + volumeMounts := []corev1api.VolumeMount{{ Name: clientVolumeName, MountPath: clientVolumePath, MountPropagation: &mountPropagation, }} volumeMounts = append(volumeMounts, podInfo.volumeMounts...) - volumes := []corev1.Volume{{ + volumes := []corev1api.Volume{{ Name: clientVolumeName, - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ + VolumeSource: corev1api.VolumeSource{ + HostPath: &corev1api.HostPathVolumeSource{ Path: hostPath, }, }, }} volumes = append(volumes, podInfo.volumes...) - if label == nil { - label = make(map[string]string) - } - args := []string{ fmt.Sprintf("--volume-path=%s", clientVolumePath), fmt.Sprintf("--resource-timeout=%s", operationTimeout.String()), @@ -317,24 +313,22 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor if exposeType == PodVolumeExposeTypeBackup { args = append(args, fmt.Sprintf("--pod-volume-backup=%s", ownerObject.Name)) command = append(command, "backup") - label[podGroupLabel] = podGroupPodVolumeBackup } else { args = append(args, fmt.Sprintf("--pod-volume-restore=%s", ownerObject.Name)) command = append(command, "restore") - label[podGroupLabel] = podGroupPodVolumeRestore } args = append(args, podInfo.logFormatArgs...) args = append(args, podInfo.logLevelArgs...) - var securityCtx *corev1.PodSecurityContext + var securityCtx *corev1api.PodSecurityContext nodeSelector := map[string]string{} - podOS := corev1.PodOS{} - toleration := []corev1.Toleration{} + podOS := corev1api.PodOS{} + toleration := []corev1api.Toleration{} if nodeOS == kube.NodeOSWindows { userID := "ContainerAdministrator" - securityCtx = &corev1.PodSecurityContext{ - WindowsOptions: &corev1.WindowsSecurityContextOptions{ + securityCtx = &corev1api.PodSecurityContext{ + WindowsOptions: &corev1api.WindowsSecurityContextOptions{ RunAsUserName: &userID, }, } @@ -342,7 +336,7 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor nodeSelector[kube.NodeOSLabel] = kube.NodeOSWindows podOS.Name = kube.NodeOSWindows - toleration = append(toleration, corev1.Toleration{ + toleration = append(toleration, corev1api.Toleration{ Key: "os", Operator: "Equal", Effect: "NoSchedule", @@ -350,7 +344,7 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor }) } else { userID := int64(0) - securityCtx = &corev1.PodSecurityContext{ + securityCtx = &corev1api.PodSecurityContext{ RunAsUser: &userID, } @@ -358,7 +352,7 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor podOS.Name = kube.NodeOSLinux } - pod := &corev1.Pod{ + pod := &corev1api.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: hostingPodName, Namespace: ownerObject.Namespace, @@ -374,14 +368,14 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor Labels: label, Annotations: annotation, }, - Spec: corev1.PodSpec{ + Spec: corev1api.PodSpec{ NodeSelector: nodeSelector, OS: &podOS, - Containers: []corev1.Container{ + Containers: []corev1api.Container{ { Name: containerName, Image: podInfo.image, - ImagePullPolicy: corev1.PullNever, + ImagePullPolicy: corev1api.PullNever, Command: command, Args: args, VolumeMounts: volumeMounts, @@ -394,7 +388,7 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor TerminationGracePeriodSeconds: &gracePeriod, Volumes: volumes, NodeName: selectedNode, - RestartPolicy: corev1.RestartPolicyNever, + RestartPolicy: corev1api.RestartPolicyNever, SecurityContext: securityCtx, Tolerations: toleration, }, diff --git a/pkg/exposer/pod_volume_test.go b/pkg/exposer/pod_volume_test.go index 8f4abab67..aac24990f 100644 --- a/pkg/exposer/pod_volume_test.go +++ b/pkg/exposer/pod_volume_test.go @@ -8,11 +8,6 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" - velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - "github.com/vmware-tanzu/velero/pkg/builder" - "github.com/vmware-tanzu/velero/pkg/datapath" - velerotest "github.com/vmware-tanzu/velero/pkg/test" - "github.com/vmware-tanzu/velero/pkg/util/filesystem" appsv1api "k8s.io/api/apps/v1" corev1api "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -20,6 +15,12 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" + + velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/datapath" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) func TestPodVolumeExpose(t *testing.T) { diff --git a/pkg/exposer/types.go b/pkg/exposer/types.go index b8f875a88..5cdb9d497 100644 --- a/pkg/exposer/types.go +++ b/pkg/exposer/types.go @@ -21,13 +21,11 @@ import ( ) const ( - AccessModeFileSystem = "by-file-system" - AccessModeBlock = "by-block-device" - podGroupLabel = "velero.io/exposer-pod-group" - podGroupSnapshot = "snapshot-exposer" - podGroupGenericRestore = "generic-restore-exposer" - podGroupPodVolumeBackup = "pod-volume-backup" - podGroupPodVolumeRestore = "pod-volume-restore" + AccessModeFileSystem = "by-file-system" + AccessModeBlock = "by-block-device" + podGroupLabel = "velero.io/exposer-pod-group" + podGroupSnapshot = "snapshot-exposer" + podGroupGenericRestore = "generic-restore-exposer" ) // ExposeResult defines the result of expose. diff --git a/pkg/install/daemonset.go b/pkg/install/daemonset.go index 1dee41a33..7eaa2a094 100644 --- a/pkg/install/daemonset.go +++ b/pkg/install/daemonset.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/vmware-tanzu/velero/internal/velero" + "github.com/vmware-tanzu/velero/pkg/nodeagent" ) func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1api.DaemonSet { @@ -126,8 +127,8 @@ func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1api.DaemonSet }, VolumeMounts: []corev1api.VolumeMount{ { - Name: "host-pods", - MountPath: "/host_pods", + Name: nodeagent.HostPodVolumeMount, + MountPath: nodeagent.HostPodVolumeMountPath(), MountPropagation: &mountPropagationMode, }, { diff --git a/pkg/nodeagent/node_agent.go b/pkg/nodeagent/node_agent.go index 1c0378901..3d1159085 100644 --- a/pkg/nodeagent/node_agent.go +++ b/pkg/nodeagent/node_agent.go @@ -42,8 +42,8 @@ const ( // nodeAgentRole marks pods with node-agent role on all nodes. nodeAgentRole = "node-agent" - // hostPodVolume is the name of the volume in node-agent for host-pod mount - hostPodVolume = "host-pods" + // HostPodVolumeMount is the name of the volume in node-agent for host-pod mount + HostPodVolumeMount = "host-pods" // HostPodVolumeMountPoint is the mount point of the volume in node-agent for host-pod mount HostPodVolumeMountPoint = "host_pods" @@ -269,7 +269,7 @@ func GetHostPodPath(ctx context.Context, kubeClient kubernetes.Interface, namesp var volume *corev1api.Volume for _, v := range ds.Spec.Template.Spec.Volumes { - if v.Name == hostPodVolume { + if v.Name == HostPodVolumeMount { volume = &v break } @@ -289,3 +289,11 @@ func GetHostPodPath(ctx context.Context, kubeClient kubernetes.Interface, namesp return volume.HostPath.Path, nil } + +func HostPodVolumeMountPath() string { + return "/" + HostPodVolumeMountPoint +} + +func HostPodVolumeMountPathWin() string { + return "\\" + HostPodVolumeMountPoint +} diff --git a/pkg/nodeagent/node_agent_test.go b/pkg/nodeagent/node_agent_test.go index 118f36ecf..8f94b1ff6 100644 --- a/pkg/nodeagent/node_agent_test.go +++ b/pkg/nodeagent/node_agent_test.go @@ -615,7 +615,7 @@ func TestGetHostPodPath(t *testing.T) { Spec: corev1api.PodSpec{ Volumes: []corev1api.Volume{ { - Name: hostPodVolume, + Name: HostPodVolumeMount, }, }, }, @@ -636,7 +636,7 @@ func TestGetHostPodPath(t *testing.T) { Spec: corev1api.PodSpec{ Volumes: []corev1api.Volume{ { - Name: hostPodVolume, + Name: HostPodVolumeMount, VolumeSource: corev1api.VolumeSource{ HostPath: &corev1api.HostPathVolumeSource{}, }, @@ -660,7 +660,7 @@ func TestGetHostPodPath(t *testing.T) { Spec: corev1api.PodSpec{ Volumes: []corev1api.Volume{ { - Name: hostPodVolume, + Name: HostPodVolumeMount, VolumeSource: corev1api.VolumeSource{ HostPath: &corev1api.HostPathVolumeSource{ Path: "/var/lib/kubelet/pods", From 829e75e9b7bc18d79c68544d9696707ac536f394 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Wed, 4 Jun 2025 13:44:10 +0800 Subject: [PATCH 3/3] issue 8960: implement PodVolume exposer for PVB/PVR Signed-off-by: Lyndon-Li --- pkg/exposer/pod_volume.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/exposer/pod_volume.go b/pkg/exposer/pod_volume.go index 6a62705d6..b402ffecb 100644 --- a/pkg/exposer/pod_volume.go +++ b/pkg/exposer/pod_volume.go @@ -152,12 +152,6 @@ func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1api.Obj return errors.Wrapf(err, "error to create hosting pod") } - defer func() { - if err != nil { - kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), hostingPod.Name, hostingPod.Namespace, curLog) - } - }() - curLog.WithField("pod name", hostingPod.Name).Info("Hosting pod is created") return nil