issue 8960: implement PodVolume exposer for PVB/PVR

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
pull/8985/head
Lyndon-Li 2025-05-29 17:15:29 +08:00
parent 9dbfdbc4d8
commit d903e9eda7
9 changed files with 69 additions and 69 deletions

View File

@ -80,8 +80,6 @@ const (
// files will be written to
defaultCredentialsDirectory = "/tmp/credentials"
defaultHostPodsPath = "/host_pods"
defaultResourceTimeout = 10 * time.Minute
defaultDataMoverPrepareTimeout = 30 * time.Minute
defaultDataPathConcurrentNum = 1
@ -416,10 +414,10 @@ func (s *nodeAgentServer) waitCacheForResume() error {
// validatePodVolumesHostPath validates that the pod volumes path contains a
// directory for each Pod running on this node
func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface) error {
files, err := s.fileSystem.ReadDir(defaultHostPodsPath)
files, err := s.fileSystem.ReadDir(nodeagent.HostPodVolumeMountPath())
if err != nil {
if errors.Is(err, os.ErrNotExist) {
s.logger.Warnf("Pod volumes host path [%s] doesn't exist, fs-backup is disabled", defaultHostPodsPath)
s.logger.Warnf("Pod volumes host path [%s] doesn't exist, fs-backup is disabled", nodeagent.HostPodVolumeMountPath())
return nil
}
return errors.Wrap(err, "could not read pod volumes host path")
@ -452,7 +450,7 @@ func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface
valid = false
s.logger.WithFields(logrus.Fields{
"pod": fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName()),
"path": defaultHostPodsPath + "/" + dirName,
"path": nodeagent.HostPodVolumeMountPath() + "/" + dirName,
}).Debug("could not find volumes for pod in host path")
}
}

View File

@ -99,7 +99,7 @@ func Test_validatePodVolumesHostPath(t *testing.T) {
for _, dir := range tt.dirs {
if tt.createDir {
err := fs.MkdirAll(filepath.Join(defaultHostPodsPath, dir), os.ModePerm)
err := fs.MkdirAll(filepath.Join(nodeagent.HostPodVolumeMountPath(), dir), os.ModePerm)
if err != nil {
t.Error(err)
}

View File

@ -59,7 +59,7 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1api.Pod, volumeName st
volSubDir = "volumeDevices"
}
pathGlob := fmt.Sprintf("/%s/%s/%s/*/%s", nodeagent.HostPodVolumeMountPoint, string(pod.GetUID()), volSubDir, volDir)
pathGlob := fmt.Sprintf("%s/%s/%s/*/%s", nodeagent.HostPodVolumeMountPath(), string(pod.GetUID()), volSubDir, volDir)
logger.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob")
path, err := singlePathMatch(pathGlob, fs, logger)
@ -88,8 +88,8 @@ func ExtractPodVolumeHostPath(ctx context.Context, path string, kubeClient kuber
}
if osType == kube.NodeOSWindows {
return strings.Replace(path, "\\"+nodeagent.HostPodVolumeMountPoint, podPath, 1), nil
return strings.Replace(path, nodeagent.HostPodVolumeMountPathWin(), podPath, 1), nil
} else {
return strings.Replace(path, "/"+nodeagent.HostPodVolumeMountPoint, podPath, 1), nil
return strings.Replace(path, nodeagent.HostPodVolumeMountPath(), podPath, 1), nil
}
}

View File

@ -23,7 +23,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -60,7 +60,7 @@ type PodVolumeExposeParam struct {
HostingPodAnnotations map[string]string
// Resources defines the resource requirements of the hosting pod
Resources corev1.ResourceRequirements
Resources corev1api.ResourceRequirements
// OperationTimeout specifies the time wait for resources operations in Expose
OperationTimeout time.Duration
@ -72,24 +72,24 @@ type PodVolumeExposeParam struct {
// PodVolumeExposer is the interfaces for a pod volume exposer
type PodVolumeExposer interface {
// Expose starts the process to a pod volume expose, the expose process may take long time
Expose(context.Context, corev1.ObjectReference, PodVolumeExposeParam) error
Expose(context.Context, corev1api.ObjectReference, PodVolumeExposeParam) error
// GetExposed polls the status of the expose.
// If the expose is accessible by the current caller, it waits the expose ready and returns the expose result.
// Otherwise, it returns nil as the expose result without an error.
GetExposed(context.Context, corev1.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error)
GetExposed(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error)
// PeekExposed tests the status of the expose.
// If the expose is incomplete but not recoverable, it returns an error.
// Otherwise, it returns nil immediately.
PeekExposed(context.Context, corev1.ObjectReference) error
PeekExposed(context.Context, corev1api.ObjectReference) error
// DiagnoseExpose generate the diagnostic info when the expose is not finished for a long time.
// If it finds any problem, it returns an string about the problem.
DiagnoseExpose(context.Context, corev1.ObjectReference) string
DiagnoseExpose(context.Context, corev1api.ObjectReference) string
// CleanUp cleans up any objects generated during the restore expose
CleanUp(context.Context, corev1.ObjectReference)
CleanUp(context.Context, corev1api.ObjectReference)
}
// NewPodVolumeExposer creates a new instance of pod volume exposer
@ -110,7 +110,7 @@ type podVolumeExposer struct {
var getPodVolumeHostPath = GetPodVolumeHostPath
var extractPodVolumeHostPath = ExtractPodVolumeHostPath
func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, param PodVolumeExposeParam) error {
func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1api.ObjectReference, param PodVolumeExposeParam) error {
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"client pod": param.ClientPodName,
@ -163,7 +163,7 @@ func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1.Object
return nil
}
func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1.ObjectReference, nodeClient client.Client, nodeName string, timeout time.Duration) (*ExposeResult, error) {
func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1api.ObjectReference, nodeClient client.Client, nodeName string, timeout time.Duration) (*ExposeResult, error) {
hostingPodName := ownerObject.Name
containerName := string(ownerObject.UID)
@ -174,9 +174,9 @@ func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1.Ob
"node": nodeName,
})
var updated *corev1.Pod
var updated *corev1api.Pod
err := wait.PollUntilContextTimeout(ctx, 2*time.Second, timeout, true, func(ctx context.Context) (bool, error) {
pod := &corev1.Pod{}
pod := &corev1api.Pod{}
err := nodeClient.Get(ctx, types.NamespacedName{
Namespace: ownerObject.Namespace,
Name: hostingPodName,
@ -186,7 +186,7 @@ func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1.Ob
return false, errors.Wrapf(err, "error to get pod %s/%s", ownerObject.Namespace, hostingPodName)
}
if pod.Status.Phase != corev1.PodRunning {
if pod.Status.Phase != corev1api.PodRunning {
return false, nil
}
@ -213,7 +213,7 @@ func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1.Ob
}}, nil
}
func (e *podVolumeExposer) PeekExposed(ctx context.Context, ownerObject corev1.ObjectReference) error {
func (e *podVolumeExposer) PeekExposed(ctx context.Context, ownerObject corev1api.ObjectReference) error {
hostingPodName := ownerObject.Name
curLog := e.log.WithFields(logrus.Fields{
@ -237,7 +237,7 @@ func (e *podVolumeExposer) PeekExposed(ctx context.Context, ownerObject corev1.O
return nil
}
func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev1.ObjectReference) string {
func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev1api.ObjectReference) string {
hostingPodName := ownerObject.Name
diag := "begin diagnose pod volume exposer\n"
@ -263,13 +263,13 @@ func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev
return diag
}
func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1.ObjectReference) {
func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) {
restorePodName := ownerObject.Name
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log)
}
func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1.ObjectReference, exposeType string, hostPath string,
operationTimeout time.Duration, label map[string]string, annotation map[string]string, selectedNode string, resources corev1.ResourceRequirements, nodeOS string) (*corev1.Pod, error) {
func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1api.ObjectReference, exposeType string, hostPath string,
operationTimeout time.Duration, label map[string]string, annotation map[string]string, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string) (*corev1api.Pod, error) {
hostingPodName := ownerObject.Name
containerName := string(ownerObject.UID)
@ -282,28 +282,24 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor
}
var gracePeriod int64
mountPropagation := corev1.MountPropagationHostToContainer
volumeMounts := []corev1.VolumeMount{{
mountPropagation := corev1api.MountPropagationHostToContainer
volumeMounts := []corev1api.VolumeMount{{
Name: clientVolumeName,
MountPath: clientVolumePath,
MountPropagation: &mountPropagation,
}}
volumeMounts = append(volumeMounts, podInfo.volumeMounts...)
volumes := []corev1.Volume{{
volumes := []corev1api.Volume{{
Name: clientVolumeName,
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
VolumeSource: corev1api.VolumeSource{
HostPath: &corev1api.HostPathVolumeSource{
Path: hostPath,
},
},
}}
volumes = append(volumes, podInfo.volumes...)
if label == nil {
label = make(map[string]string)
}
args := []string{
fmt.Sprintf("--volume-path=%s", clientVolumePath),
fmt.Sprintf("--resource-timeout=%s", operationTimeout.String()),
@ -317,24 +313,22 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor
if exposeType == PodVolumeExposeTypeBackup {
args = append(args, fmt.Sprintf("--pod-volume-backup=%s", ownerObject.Name))
command = append(command, "backup")
label[podGroupLabel] = podGroupPodVolumeBackup
} else {
args = append(args, fmt.Sprintf("--pod-volume-restore=%s", ownerObject.Name))
command = append(command, "restore")
label[podGroupLabel] = podGroupPodVolumeRestore
}
args = append(args, podInfo.logFormatArgs...)
args = append(args, podInfo.logLevelArgs...)
var securityCtx *corev1.PodSecurityContext
var securityCtx *corev1api.PodSecurityContext
nodeSelector := map[string]string{}
podOS := corev1.PodOS{}
toleration := []corev1.Toleration{}
podOS := corev1api.PodOS{}
toleration := []corev1api.Toleration{}
if nodeOS == kube.NodeOSWindows {
userID := "ContainerAdministrator"
securityCtx = &corev1.PodSecurityContext{
WindowsOptions: &corev1.WindowsSecurityContextOptions{
securityCtx = &corev1api.PodSecurityContext{
WindowsOptions: &corev1api.WindowsSecurityContextOptions{
RunAsUserName: &userID,
},
}
@ -342,7 +336,7 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor
nodeSelector[kube.NodeOSLabel] = kube.NodeOSWindows
podOS.Name = kube.NodeOSWindows
toleration = append(toleration, corev1.Toleration{
toleration = append(toleration, corev1api.Toleration{
Key: "os",
Operator: "Equal",
Effect: "NoSchedule",
@ -350,7 +344,7 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor
})
} else {
userID := int64(0)
securityCtx = &corev1.PodSecurityContext{
securityCtx = &corev1api.PodSecurityContext{
RunAsUser: &userID,
}
@ -358,7 +352,7 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor
podOS.Name = kube.NodeOSLinux
}
pod := &corev1.Pod{
pod := &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: hostingPodName,
Namespace: ownerObject.Namespace,
@ -374,14 +368,14 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor
Labels: label,
Annotations: annotation,
},
Spec: corev1.PodSpec{
Spec: corev1api.PodSpec{
NodeSelector: nodeSelector,
OS: &podOS,
Containers: []corev1.Container{
Containers: []corev1api.Container{
{
Name: containerName,
Image: podInfo.image,
ImagePullPolicy: corev1.PullNever,
ImagePullPolicy: corev1api.PullNever,
Command: command,
Args: args,
VolumeMounts: volumeMounts,
@ -394,7 +388,7 @@ func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject cor
TerminationGracePeriodSeconds: &gracePeriod,
Volumes: volumes,
NodeName: selectedNode,
RestartPolicy: corev1.RestartPolicyNever,
RestartPolicy: corev1api.RestartPolicyNever,
SecurityContext: securityCtx,
Tolerations: toleration,
},

View File

@ -8,11 +8,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
appsv1api "k8s.io/api/apps/v1"
corev1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -20,6 +15,12 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
func TestPodVolumeExpose(t *testing.T) {

View File

@ -21,13 +21,11 @@ import (
)
const (
AccessModeFileSystem = "by-file-system"
AccessModeBlock = "by-block-device"
podGroupLabel = "velero.io/exposer-pod-group"
podGroupSnapshot = "snapshot-exposer"
podGroupGenericRestore = "generic-restore-exposer"
podGroupPodVolumeBackup = "pod-volume-backup"
podGroupPodVolumeRestore = "pod-volume-restore"
AccessModeFileSystem = "by-file-system"
AccessModeBlock = "by-block-device"
podGroupLabel = "velero.io/exposer-pod-group"
podGroupSnapshot = "snapshot-exposer"
podGroupGenericRestore = "generic-restore-exposer"
)
// ExposeResult defines the result of expose.

View File

@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/vmware-tanzu/velero/internal/velero"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
)
func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1api.DaemonSet {
@ -126,8 +127,8 @@ func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1api.DaemonSet
},
VolumeMounts: []corev1api.VolumeMount{
{
Name: "host-pods",
MountPath: "/host_pods",
Name: nodeagent.HostPodVolumeMount,
MountPath: nodeagent.HostPodVolumeMountPath(),
MountPropagation: &mountPropagationMode,
},
{

View File

@ -42,8 +42,8 @@ const (
// nodeAgentRole marks pods with node-agent role on all nodes.
nodeAgentRole = "node-agent"
// hostPodVolume is the name of the volume in node-agent for host-pod mount
hostPodVolume = "host-pods"
// HostPodVolumeMount is the name of the volume in node-agent for host-pod mount
HostPodVolumeMount = "host-pods"
// HostPodVolumeMountPoint is the mount point of the volume in node-agent for host-pod mount
HostPodVolumeMountPoint = "host_pods"
@ -269,7 +269,7 @@ func GetHostPodPath(ctx context.Context, kubeClient kubernetes.Interface, namesp
var volume *corev1api.Volume
for _, v := range ds.Spec.Template.Spec.Volumes {
if v.Name == hostPodVolume {
if v.Name == HostPodVolumeMount {
volume = &v
break
}
@ -289,3 +289,11 @@ func GetHostPodPath(ctx context.Context, kubeClient kubernetes.Interface, namesp
return volume.HostPath.Path, nil
}
func HostPodVolumeMountPath() string {
return "/" + HostPodVolumeMountPoint
}
func HostPodVolumeMountPathWin() string {
return "\\" + HostPodVolumeMountPoint
}

View File

@ -615,7 +615,7 @@ func TestGetHostPodPath(t *testing.T) {
Spec: corev1api.PodSpec{
Volumes: []corev1api.Volume{
{
Name: hostPodVolume,
Name: HostPodVolumeMount,
},
},
},
@ -636,7 +636,7 @@ func TestGetHostPodPath(t *testing.T) {
Spec: corev1api.PodSpec{
Volumes: []corev1api.Volume{
{
Name: hostPodVolume,
Name: HostPodVolumeMount,
VolumeSource: corev1api.VolumeSource{
HostPath: &corev1api.HostPathVolumeSource{},
},
@ -660,7 +660,7 @@ func TestGetHostPodPath(t *testing.T) {
Spec: corev1api.PodSpec{
Volumes: []corev1api.Volume{
{
Name: hostPodVolume,
Name: HostPodVolumeMount,
VolumeSource: corev1api.VolumeSource{
HostPath: &corev1api.HostPathVolumeSource{
Path: "/var/lib/kubelet/pods",