Merge pull request #8985 from Lyndon-Li/pod-volume-exposer

Issue 8960: implement PodVolume exposer for PVB/PVR
pull/8990/head
Wenkai Yin(尹文开) 2025-06-06 15:36:58 +08:00 committed by GitHub
commit b58dbcb0b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 1330 additions and 52 deletions

View File

@ -0,0 +1 @@
Fix issue #8960, implement PodVolume exposer for PVB/PVR

View File

@ -76,8 +76,6 @@ const (
// the port where prometheus metrics are exposed
defaultMetricsAddress = ":8085"
defaultHostPodsPath = "/host_pods"
defaultResourceTimeout = 10 * time.Minute
defaultDataMoverPrepareTimeout = 30 * time.Minute
defaultDataPathConcurrentNum = 1
@ -301,14 +299,14 @@ func (s *nodeAgentServer) run() {
credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
repoEnsurer := repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout)
pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer,
pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer,
credentialGetter, s.nodeName, s.mgr.GetScheme(), s.metrics, s.logger)
if err := pvbReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerPodVolumeBackup)
}
if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil {
if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}
@ -412,10 +410,10 @@ func (s *nodeAgentServer) waitCacheForResume() error {
// validatePodVolumesHostPath validates that the pod volumes path contains a
// directory for each Pod running on this node
func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface) error {
files, err := s.fileSystem.ReadDir(defaultHostPodsPath)
files, err := s.fileSystem.ReadDir(nodeagent.HostPodVolumeMountPath())
if err != nil {
if errors.Is(err, os.ErrNotExist) {
s.logger.Warnf("Pod volumes host path [%s] doesn't exist, fs-backup is disabled", defaultHostPodsPath)
s.logger.Warnf("Pod volumes host path [%s] doesn't exist, fs-backup is disabled", nodeagent.HostPodVolumeMountPath())
return nil
}
return errors.Wrap(err, "could not read pod volumes host path")
@ -448,7 +446,7 @@ func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface
valid = false
s.logger.WithFields(logrus.Fields{
"pod": fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName()),
"path": defaultHostPodsPath + "/" + dirName,
"path": nodeagent.HostPodVolumeMountPath() + "/" + dirName,
}).Debug("could not find volumes for pod in host path")
}
}

View File

@ -99,7 +99,7 @@ func Test_validatePodVolumesHostPath(t *testing.T) {
for _, dir := range tt.dirs {
if tt.createDir {
err := fs.MkdirAll(filepath.Join(defaultHostPodsPath, dir), os.ModePerm)
err := fs.MkdirAll(filepath.Join(nodeagent.HostPodVolumeMountPath(), dir), os.ModePerm)
if err != nil {
t.Error(err)
}

View File

@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
clocks "k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -48,10 +49,11 @@ import (
const pVBRRequestor string = "pod-volume-backup-restore"
// NewPodVolumeBackupReconciler creates the PodVolumeBackupReconciler instance
func NewPodVolumeBackupReconciler(client client.Client, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter,
func NewPodVolumeBackupReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter,
nodeName string, scheme *runtime.Scheme, metrics *metrics.ServerMetrics, logger logrus.FieldLogger) *PodVolumeBackupReconciler {
return &PodVolumeBackupReconciler{
Client: client,
kubeClient: kubeClient,
logger: logger.WithField("controller", "PodVolumeBackup"),
repositoryEnsurer: ensurer,
credentialGetter: credentialGetter,
@ -67,6 +69,7 @@ func NewPodVolumeBackupReconciler(client client.Client, dataPathMgr *datapath.Ma
// PodVolumeBackupReconciler reconciles a PodVolumeBackup object
type PodVolumeBackupReconciler struct {
client.Client
kubeClient kubernetes.Interface
scheme *runtime.Scheme
clock clocks.WithTickerAndDelayedExecution
metrics *metrics.ServerMetrics
@ -155,7 +158,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return r.errorOut(ctx, &pvb, err, fmt.Sprintf("getting pod %s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name), log)
}
path, err := exposer.GetPodVolumeHostPath(ctx, &pod, pvb.Spec.Volume, r.Client, r.fileSystem, log)
path, err := exposer.GetPodVolumeHostPath(ctx, &pod, pvb.Spec.Volume, r.kubeClient, r.fileSystem, log)
if err != nil {
r.closeDataPath(ctx, pvb.Name)
return r.errorOut(ctx, &pvb, err, "error exposing host path for pod volume", log)

View File

@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
clocks "k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -49,10 +50,11 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
func NewPodVolumeRestoreReconciler(client client.Client, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer,
func NewPodVolumeRestoreReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer,
credentialGetter *credentials.CredentialGetter, logger logrus.FieldLogger) *PodVolumeRestoreReconciler {
return &PodVolumeRestoreReconciler{
Client: client,
kubeClient: kubeClient,
logger: logger.WithField("controller", "PodVolumeRestore"),
repositoryEnsurer: ensurer,
credentialGetter: credentialGetter,
@ -64,6 +66,7 @@ func NewPodVolumeRestoreReconciler(client client.Client, dataPathMgr *datapath.M
type PodVolumeRestoreReconciler struct {
client.Client
kubeClient kubernetes.Interface
logger logrus.FieldLogger
repositoryEnsurer *repository.Ensurer
credentialGetter *credentials.CredentialGetter
@ -135,7 +138,7 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req
return c.errorOut(ctx, pvr, err, "error to update status to in progress", log)
}
volumePath, err := exposer.GetPodVolumeHostPath(ctx, pod, pvr.Spec.Volume, c.Client, c.fileSystem, log)
volumePath, err := exposer.GetPodVolumeHostPath(ctx, pod, pvr.Spec.Volume, c.kubeClient, c.fileSystem, log)
if err != nil {
c.closeDataPath(ctx, pvr.Name)
return c.errorOut(ctx, pvr, err, "error exposing host path for pod volume", log)

View File

@ -19,13 +19,15 @@ package exposer
import (
"context"
"fmt"
"strings"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"k8s.io/client-go/kubernetes"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
@ -37,17 +39,17 @@ var singlePathMatch = kube.SinglePathMatch
// GetPodVolumeHostPath returns a path that can be accessed from the host for a given volume of a pod
func GetPodVolumeHostPath(ctx context.Context, pod *corev1api.Pod, volumeName string,
cli ctrlclient.Client, fs filesystem.Interface, log logrus.FieldLogger) (datapath.AccessPoint, error) {
kubeClient kubernetes.Interface, fs filesystem.Interface, log logrus.FieldLogger) (datapath.AccessPoint, error) {
logger := log.WithField("pod name", pod.Name).WithField("pod UID", pod.GetUID()).WithField("volume", volumeName)
volDir, err := getVolumeDirectory(ctx, logger, pod, volumeName, cli)
volDir, err := getVolumeDirectory(ctx, logger, pod, volumeName, kubeClient)
if err != nil {
return datapath.AccessPoint{}, errors.Wrapf(err, "error getting volume directory name for volume %s in pod %s", volumeName, pod.Name)
}
logger.WithField("volDir", volDir).Info("Got volume dir")
volMode, err := getVolumeMode(ctx, logger, pod, volumeName, cli)
volMode, err := getVolumeMode(ctx, logger, pod, volumeName, kubeClient)
if err != nil {
return datapath.AccessPoint{}, errors.Wrapf(err, "error getting volume mode for volume %s in pod %s", volumeName, pod.Name)
}
@ -57,7 +59,7 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1api.Pod, volumeName st
volSubDir = "volumeDevices"
}
pathGlob := fmt.Sprintf("/host_pods/%s/%s/*/%s", string(pod.GetUID()), volSubDir, volDir)
pathGlob := fmt.Sprintf("%s/%s/%s/*/%s", nodeagent.HostPodVolumeMountPath(), string(pod.GetUID()), volSubDir, volDir)
logger.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob")
path, err := singlePathMatch(pathGlob, fs, logger)
@ -72,3 +74,22 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1api.Pod, volumeName st
VolMode: volMode,
}, nil
}
var getHostPodPath = nodeagent.GetHostPodPath
func ExtractPodVolumeHostPath(ctx context.Context, path string, kubeClient kubernetes.Interface, veleroNamespace string, osType string) (string, error) {
podPath, err := getHostPodPath(ctx, kubeClient, veleroNamespace, osType)
if err != nil {
return "", errors.Wrap(err, "error getting host pod path from node-agent")
}
if osType == kube.NodeOSWindows {
podPath = strings.Replace(podPath, "/", "\\", -1)
}
if osType == kube.NodeOSWindows {
return strings.Replace(path, nodeagent.HostPodVolumeMountPathWin(), podPath, 1), nil
} else {
return strings.Replace(path, nodeagent.HostPodVolumeMountPath(), podPath, 1), nil
}
}

View File

@ -18,26 +18,29 @@ package exposer
import (
"context"
"fmt"
"testing"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
corev1api "k8s.io/api/core/v1"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"k8s.io/client-go/kubernetes"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
func TestGetPodVolumeHostPath(t *testing.T) {
tests := []struct {
name string
getVolumeDirFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (string, error)
getVolumeModeFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (uploader.PersistentVolumeMode, error)
getVolumeDirFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (string, error)
getVolumeModeFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (uploader.PersistentVolumeMode, error)
pathMatchFunc func(string, filesystem.Interface, logrus.FieldLogger) (string, error)
pod *corev1api.Pod
pvc string
@ -45,7 +48,7 @@ func TestGetPodVolumeHostPath(t *testing.T) {
}{
{
name: "get volume dir fail",
getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (string, error) {
getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (string, error) {
return "", errors.New("fake-error-1")
},
pod: builder.ForPod(velerov1api.DefaultNamespace, "fake-pod-1").Result(),
@ -54,10 +57,10 @@ func TestGetPodVolumeHostPath(t *testing.T) {
},
{
name: "single path match fail",
getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (string, error) {
getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (string, error) {
return "", nil
},
getVolumeModeFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (uploader.PersistentVolumeMode, error) {
getVolumeModeFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (uploader.PersistentVolumeMode, error) {
return uploader.PersistentVolumeFilesystem, nil
},
pathMatchFunc: func(string, filesystem.Interface, logrus.FieldLogger) (string, error) {
@ -69,7 +72,7 @@ func TestGetPodVolumeHostPath(t *testing.T) {
},
{
name: "get block volume dir success",
getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (
getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (
string, error) {
return "fake-pvc-1", nil
},
@ -102,3 +105,58 @@ func TestGetPodVolumeHostPath(t *testing.T) {
})
}
}
func TestExtractPodVolumeHostPath(t *testing.T) {
tests := []struct {
name string
getHostPodPathFunc func(context.Context, kubernetes.Interface, string, string) (string, error)
path string
osType string
expectedErr string
expected string
}{
{
name: "get host pod path error",
getHostPodPathFunc: func(context.Context, kubernetes.Interface, string, string) (string, error) {
return "", errors.New("fake-error-1")
},
expectedErr: "error getting host pod path from node-agent: fake-error-1",
},
{
name: "Windows os",
getHostPodPathFunc: func(context.Context, kubernetes.Interface, string, string) (string, error) {
return "/var/lib/kubelet/pods", nil
},
path: fmt.Sprintf("\\%s\\pod-id-xxx\\volumes\\kubernetes.io~csi\\pvc-id-xxx\\mount", nodeagent.HostPodVolumeMountPoint),
osType: kube.NodeOSWindows,
expected: "\\var\\lib\\kubelet\\pods\\pod-id-xxx\\volumes\\kubernetes.io~csi\\pvc-id-xxx\\mount",
},
{
name: "linux OS",
getHostPodPathFunc: func(context.Context, kubernetes.Interface, string, string) (string, error) {
return "/var/lib/kubelet/pods", nil
},
path: fmt.Sprintf("/%s/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nodeagent.HostPodVolumeMountPoint),
osType: kube.NodeOSLinux,
expected: "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.getHostPodPathFunc != nil {
getHostPodPath = test.getHostPodPathFunc
}
path, err := ExtractPodVolumeHostPath(context.Background(), test.path, nil, "", test.osType)
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)
} else {
assert.NoError(t, err)
assert.Equal(t, test.expected, path)
}
})
}
}

392
pkg/exposer/pod_volume.go Normal file
View File

@ -0,0 +1,392 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exposer
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
const (
PodVolumeExposeTypeBackup = "pod-volume-backup"
PodVolumeExposeTypeRestore = "pod-volume-restore"
)
// PodVolumeExposeParam define the input param for pod volume Expose
type PodVolumeExposeParam struct {
// ClientPodName is the name of pod to be backed up or restored
ClientPodName string
// ClientNamespace is the namespace to be backed up or restored
ClientNamespace string
// ClientNamespace is the pod volume for the client PVC
ClientPodVolume string
// HostingPodLabels is the labels that are going to apply to the hosting pod
HostingPodLabels map[string]string
// HostingPodAnnotations is the annotations that are going to apply to the hosting pod
HostingPodAnnotations map[string]string
// Resources defines the resource requirements of the hosting pod
Resources corev1api.ResourceRequirements
// OperationTimeout specifies the time wait for resources operations in Expose
OperationTimeout time.Duration
// Type specifies the type of the expose, either backup or erstore
Type string
}
// PodVolumeExposer is the interfaces for a pod volume exposer
type PodVolumeExposer interface {
// Expose starts the process to a pod volume expose, the expose process may take long time
Expose(context.Context, corev1api.ObjectReference, PodVolumeExposeParam) error
// GetExposed polls the status of the expose.
// If the expose is accessible by the current caller, it waits the expose ready and returns the expose result.
// Otherwise, it returns nil as the expose result without an error.
GetExposed(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error)
// PeekExposed tests the status of the expose.
// If the expose is incomplete but not recoverable, it returns an error.
// Otherwise, it returns nil immediately.
PeekExposed(context.Context, corev1api.ObjectReference) error
// DiagnoseExpose generate the diagnostic info when the expose is not finished for a long time.
// If it finds any problem, it returns an string about the problem.
DiagnoseExpose(context.Context, corev1api.ObjectReference) string
// CleanUp cleans up any objects generated during the restore expose
CleanUp(context.Context, corev1api.ObjectReference)
}
// NewPodVolumeExposer creates a new instance of pod volume exposer
func NewPodVolumeExposer(kubeClient kubernetes.Interface, log logrus.FieldLogger) PodVolumeExposer {
return &podVolumeExposer{
kubeClient: kubeClient,
fs: filesystem.NewFileSystem(),
log: log,
}
}
type podVolumeExposer struct {
kubeClient kubernetes.Interface
fs filesystem.Interface
log logrus.FieldLogger
}
var getPodVolumeHostPath = GetPodVolumeHostPath
var extractPodVolumeHostPath = ExtractPodVolumeHostPath
func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1api.ObjectReference, param PodVolumeExposeParam) error {
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"client pod": param.ClientPodName,
"client pod volume": param.ClientPodVolume,
"client namespace": param.ClientNamespace,
"type": param.Type,
})
pod, err := e.kubeClient.CoreV1().Pods(param.ClientNamespace).Get(ctx, param.ClientPodName, metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "error getting client pod %s", param.ClientPodName)
}
if pod.Spec.NodeName == "" {
return errors.Errorf("client pod %s doesn't have a node name", pod.Name)
}
nodeOS, err := kube.GetNodeOS(ctx, pod.Spec.NodeName, e.kubeClient.CoreV1())
if err != nil {
return errors.Wrapf(err, "error getting OS for node %s", pod.Spec.NodeName)
}
curLog.Infof("Client pod is running in node %s, os %s", pod.Spec.NodeName, nodeOS)
path, err := getPodVolumeHostPath(ctx, pod, param.ClientPodVolume, e.kubeClient, e.fs, e.log)
if err != nil {
return errors.Wrapf(err, "error to get pod volume path")
}
path.ByPath, err = extractPodVolumeHostPath(ctx, path.ByPath, e.kubeClient, ownerObject.Namespace, nodeOS)
if err != nil {
return errors.Wrapf(err, "error to extract pod volume path")
}
curLog.WithField("path", path).Infof("Host path is retrieved for pod %s, volume %s", param.ClientPodName, param.ClientPodVolume)
hostingPod, err := e.createHostingPod(ctx, ownerObject, param.Type, path.ByPath, param.OperationTimeout, param.HostingPodLabels, param.HostingPodAnnotations, pod.Spec.NodeName, param.Resources, nodeOS)
if err != nil {
return errors.Wrapf(err, "error to create hosting pod")
}
curLog.WithField("pod name", hostingPod.Name).Info("Hosting pod is created")
return nil
}
func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1api.ObjectReference, nodeClient client.Client, nodeName string, timeout time.Duration) (*ExposeResult, error) {
hostingPodName := ownerObject.Name
containerName := string(ownerObject.UID)
volumeName := string(ownerObject.UID)
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"node": nodeName,
})
var updated *corev1api.Pod
err := wait.PollUntilContextTimeout(ctx, 2*time.Second, timeout, true, func(ctx context.Context) (bool, error) {
pod := &corev1api.Pod{}
err := nodeClient.Get(ctx, types.NamespacedName{
Namespace: ownerObject.Namespace,
Name: hostingPodName,
}, pod)
if err != nil {
return false, errors.Wrapf(err, "error to get pod %s/%s", ownerObject.Namespace, hostingPodName)
}
if pod.Status.Phase != corev1api.PodRunning {
return false, nil
}
updated = pod
return true, nil
})
if err != nil {
if apierrors.IsNotFound(err) {
curLog.WithField("hosting pod", hostingPodName).Debug("Hosting pod is not running in the current node")
return nil, nil
} else {
return nil, errors.Wrapf(err, "error to wait for rediness of pod %s", hostingPodName)
}
}
curLog.WithField("pod", updated.Name).Infof("Hosting pod is in running state in node %s", updated.Spec.NodeName)
return &ExposeResult{ByPod: ExposeByPod{
HostingPod: updated,
HostingContainer: containerName,
VolumeName: volumeName,
}}, nil
}
func (e *podVolumeExposer) PeekExposed(ctx context.Context, ownerObject corev1api.ObjectReference) error {
hostingPodName := ownerObject.Name
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
})
pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, hostingPodName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
curLog.WithError(err).Warnf("error to peek hosting pod %s", hostingPodName)
return nil
}
if podFailed, message := kube.IsPodUnrecoverable(pod, curLog); podFailed {
return errors.New(message)
}
return nil
}
func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev1api.ObjectReference) string {
hostingPodName := ownerObject.Name
diag := "begin diagnose pod volume exposer\n"
pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, hostingPodName, metav1.GetOptions{})
if err != nil {
pod = nil
diag += fmt.Sprintf("error getting hosting pod %s, err: %v\n", hostingPodName, err)
}
if pod != nil {
diag += kube.DiagnosePod(pod)
if pod.Spec.NodeName != "" {
if err := nodeagent.KbClientIsRunningInNode(ctx, ownerObject.Namespace, pod.Spec.NodeName, e.kubeClient); err != nil {
diag += fmt.Sprintf("node-agent is not running in node %s, err: %v\n", pod.Spec.NodeName, err)
}
}
}
diag += "end diagnose pod volume exposer"
return diag
}
func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) {
restorePodName := ownerObject.Name
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log)
}
func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1api.ObjectReference, exposeType string, hostPath string,
operationTimeout time.Duration, label map[string]string, annotation map[string]string, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string) (*corev1api.Pod, error) {
hostingPodName := ownerObject.Name
containerName := string(ownerObject.UID)
clientVolumeName := string(ownerObject.UID)
clientVolumePath := "/" + clientVolumeName
podInfo, err := getInheritedPodInfo(ctx, e.kubeClient, ownerObject.Namespace, nodeOS)
if err != nil {
return nil, errors.Wrap(err, "error to get inherited pod info from node-agent")
}
var gracePeriod int64
mountPropagation := corev1api.MountPropagationHostToContainer
volumeMounts := []corev1api.VolumeMount{{
Name: clientVolumeName,
MountPath: clientVolumePath,
MountPropagation: &mountPropagation,
}}
volumeMounts = append(volumeMounts, podInfo.volumeMounts...)
volumes := []corev1api.Volume{{
Name: clientVolumeName,
VolumeSource: corev1api.VolumeSource{
HostPath: &corev1api.HostPathVolumeSource{
Path: hostPath,
},
},
}}
volumes = append(volumes, podInfo.volumes...)
args := []string{
fmt.Sprintf("--volume-path=%s", clientVolumePath),
fmt.Sprintf("--resource-timeout=%s", operationTimeout.String()),
}
command := []string{
"/velero",
"pod-volume",
}
if exposeType == PodVolumeExposeTypeBackup {
args = append(args, fmt.Sprintf("--pod-volume-backup=%s", ownerObject.Name))
command = append(command, "backup")
} else {
args = append(args, fmt.Sprintf("--pod-volume-restore=%s", ownerObject.Name))
command = append(command, "restore")
}
args = append(args, podInfo.logFormatArgs...)
args = append(args, podInfo.logLevelArgs...)
var securityCtx *corev1api.PodSecurityContext
nodeSelector := map[string]string{}
podOS := corev1api.PodOS{}
toleration := []corev1api.Toleration{}
if nodeOS == kube.NodeOSWindows {
userID := "ContainerAdministrator"
securityCtx = &corev1api.PodSecurityContext{
WindowsOptions: &corev1api.WindowsSecurityContextOptions{
RunAsUserName: &userID,
},
}
nodeSelector[kube.NodeOSLabel] = kube.NodeOSWindows
podOS.Name = kube.NodeOSWindows
toleration = append(toleration, corev1api.Toleration{
Key: "os",
Operator: "Equal",
Effect: "NoSchedule",
Value: "windows",
})
} else {
userID := int64(0)
securityCtx = &corev1api.PodSecurityContext{
RunAsUser: &userID,
}
nodeSelector[kube.NodeOSLabel] = kube.NodeOSLinux
podOS.Name = kube.NodeOSLinux
}
pod := &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: hostingPodName,
Namespace: ownerObject.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: ownerObject.APIVersion,
Kind: ownerObject.Kind,
Name: ownerObject.Name,
UID: ownerObject.UID,
Controller: boolptr.True(),
},
},
Labels: label,
Annotations: annotation,
},
Spec: corev1api.PodSpec{
NodeSelector: nodeSelector,
OS: &podOS,
Containers: []corev1api.Container{
{
Name: containerName,
Image: podInfo.image,
ImagePullPolicy: corev1api.PullNever,
Command: command,
Args: args,
VolumeMounts: volumeMounts,
Env: podInfo.env,
EnvFrom: podInfo.envFrom,
Resources: resources,
},
},
ServiceAccountName: podInfo.serviceAccount,
TerminationGracePeriodSeconds: &gracePeriod,
Volumes: volumes,
NodeName: selectedNode,
RestartPolicy: corev1api.RestartPolicyNever,
SecurityContext: securityCtx,
Tolerations: toleration,
},
}
return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}

View File

@ -0,0 +1,591 @@
package exposer
import (
"context"
"errors"
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
appsv1api "k8s.io/api/apps/v1"
corev1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
func TestPodVolumeExpose(t *testing.T) {
backup := &velerov1.Backup{
TypeMeta: metav1.TypeMeta{
APIVersion: velerov1.SchemeGroupVersion.String(),
Kind: "Backup",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-backup",
UID: "fake-uid",
},
}
podWithNoNode := builder.ForPod("fake-ns", "fake-client-pod").Result()
podWithNode := builder.ForPod("fake-ns", "fake-client-pod").NodeName("fake-node").Result()
node := builder.ForNode("fake-node").Result()
daemonSet := &appsv1api.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "node-agent",
},
TypeMeta: metav1.TypeMeta{
Kind: "DaemonSet",
APIVersion: appsv1api.SchemeGroupVersion.String(),
},
Spec: appsv1api.DaemonSetSpec{
Template: corev1api.PodTemplateSpec{
Spec: corev1api.PodSpec{
Containers: []corev1api.Container{
{
Name: "node-agent",
},
},
},
},
},
}
tests := []struct {
name string
snapshotClientObj []runtime.Object
kubeClientObj []runtime.Object
ownerBackup *velerov1.Backup
exposeParam PodVolumeExposeParam
funcGetPodVolumeHostPath func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error)
funcExtractPodVolumeHostPath func(context.Context, string, kubernetes.Interface, string, string) (string, error)
err string
}{
{
name: "get client pod fail",
ownerBackup: backup,
exposeParam: PodVolumeExposeParam{
ClientNamespace: "fake-ns",
ClientPodName: "fake-client-pod",
},
err: "error getting client pod fake-client-pod: pods \"fake-client-pod\" not found",
},
{
name: "client pod with no node name",
ownerBackup: backup,
exposeParam: PodVolumeExposeParam{
ClientNamespace: "fake-ns",
ClientPodName: "fake-client-pod",
},
kubeClientObj: []runtime.Object{
podWithNoNode,
},
err: "client pod fake-client-pod doesn't have a node name",
},
{
name: "get node os fail",
ownerBackup: backup,
exposeParam: PodVolumeExposeParam{
ClientNamespace: "fake-ns",
ClientPodName: "fake-client-pod",
},
kubeClientObj: []runtime.Object{
podWithNode,
},
err: "error getting OS for node fake-node: error getting node fake-node: nodes \"fake-node\" not found",
},
{
name: "get pod volume path fail",
ownerBackup: backup,
exposeParam: PodVolumeExposeParam{
ClientNamespace: "fake-ns",
ClientPodName: "fake-client-pod",
ClientPodVolume: "fake-client-volume",
},
kubeClientObj: []runtime.Object{
podWithNode,
node,
},
funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) {
return datapath.AccessPoint{}, errors.New("fake-get-pod-volume-path-error")
},
err: "error to get pod volume path: fake-get-pod-volume-path-error",
},
{
name: "extract pod volume path fail",
ownerBackup: backup,
exposeParam: PodVolumeExposeParam{
ClientNamespace: "fake-ns",
ClientPodName: "fake-client-pod",
ClientPodVolume: "fake-client-volume",
},
kubeClientObj: []runtime.Object{
podWithNode,
node,
},
funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) {
return datapath.AccessPoint{
ByPath: "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount",
}, nil
},
funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) {
return "", errors.New("fake-extract-error")
},
err: "error to extract pod volume path: fake-extract-error",
},
{
name: "create hosting pod fail",
ownerBackup: backup,
exposeParam: PodVolumeExposeParam{
ClientNamespace: "fake-ns",
ClientPodName: "fake-client-pod",
ClientPodVolume: "fake-client-volume",
},
kubeClientObj: []runtime.Object{
podWithNode,
node,
},
funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) {
return datapath.AccessPoint{
ByPath: "/host_pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount",
}, nil
},
funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) {
return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil
},
err: "error to create hosting pod: error to get inherited pod info from node-agent: error to get node-agent pod template: error to get node-agent daemonset: daemonsets.apps \"node-agent\" not found",
},
{
name: "succeed",
ownerBackup: backup,
exposeParam: PodVolumeExposeParam{
ClientNamespace: "fake-ns",
ClientPodName: "fake-client-pod",
ClientPodVolume: "fake-client-volume",
},
kubeClientObj: []runtime.Object{
podWithNode,
node,
daemonSet,
},
funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) {
return datapath.AccessPoint{
ByPath: "/host_pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount",
}, nil
},
funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) {
return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
exposer := podVolumeExposer{
kubeClient: fakeKubeClient,
log: velerotest.NewLogger(),
}
var ownerObject corev1api.ObjectReference
if test.ownerBackup != nil {
ownerObject = corev1api.ObjectReference{
Kind: test.ownerBackup.Kind,
Namespace: test.ownerBackup.Namespace,
Name: test.ownerBackup.Name,
UID: test.ownerBackup.UID,
APIVersion: test.ownerBackup.APIVersion,
}
}
if test.funcGetPodVolumeHostPath != nil {
getPodVolumeHostPath = test.funcGetPodVolumeHostPath
}
if test.funcExtractPodVolumeHostPath != nil {
extractPodVolumeHostPath = test.funcExtractPodVolumeHostPath
}
err := exposer.Expose(context.Background(), ownerObject, test.exposeParam)
if err == nil {
assert.NoError(t, err)
_, err = exposer.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(context.Background(), ownerObject.Name, metav1.GetOptions{})
assert.NoError(t, err)
} else {
assert.EqualError(t, err, test.err)
}
})
}
}
func TestGetPodVolumeExpose(t *testing.T) {
backup := &velerov1.Backup{
TypeMeta: metav1.TypeMeta{
APIVersion: velerov1.SchemeGroupVersion.String(),
Kind: "Backup",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-backup",
UID: "fake-uid",
},
}
backupPodNotRunning := builder.ForPod(backup.Namespace, backup.Name).Result()
backupPodRunning := builder.ForPod(backup.Namespace, backup.Name).Phase(corev1api.PodRunning).Result()
scheme := runtime.NewScheme()
corev1api.AddToScheme(scheme)
tests := []struct {
name string
kubeClientObj []runtime.Object
ownerBackup *velerov1.Backup
nodeName string
Timeout time.Duration
err string
expectedResult *ExposeResult
}{
{
name: "backup pod is not found",
ownerBackup: backup,
nodeName: "fake-node",
},
{
name: "wait backup pod running fail",
ownerBackup: backup,
nodeName: "fake-node",
kubeClientObj: []runtime.Object{
backupPodNotRunning,
},
Timeout: time.Second,
err: "error to wait for rediness of pod fake-backup: context deadline exceeded",
},
{
name: "succeed",
ownerBackup: backup,
nodeName: "fake-node",
kubeClientObj: []runtime.Object{
backupPodRunning,
},
Timeout: time.Second,
expectedResult: &ExposeResult{
ByPod: ExposeByPod{
HostingPod: backupPodRunning,
VolumeName: string(backup.UID),
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
fakeClientBuilder := clientFake.NewClientBuilder()
fakeClientBuilder = fakeClientBuilder.WithScheme(scheme)
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
exposer := podVolumeExposer{
kubeClient: fakeKubeClient,
log: velerotest.NewLogger(),
}
var ownerObject corev1api.ObjectReference
if test.ownerBackup != nil {
ownerObject = corev1api.ObjectReference{
Kind: test.ownerBackup.Kind,
Namespace: test.ownerBackup.Namespace,
Name: test.ownerBackup.Name,
UID: test.ownerBackup.UID,
APIVersion: test.ownerBackup.APIVersion,
}
}
result, err := exposer.GetExposed(context.Background(), ownerObject, fakeClient, test.nodeName, test.Timeout)
if test.err == "" {
assert.NoError(t, err)
if test.expectedResult == nil {
assert.Nil(t, result)
} else {
assert.NoError(t, err)
assert.Equal(t, test.expectedResult.ByPod.VolumeName, result.ByPod.VolumeName)
assert.Equal(t, test.expectedResult.ByPod.HostingPod.Name, result.ByPod.HostingPod.Name)
}
} else {
assert.EqualError(t, err, test.err)
}
})
}
}
func TestPodVolumePeekExpose(t *testing.T) {
backup := &velerov1.Backup{
TypeMeta: metav1.TypeMeta{
APIVersion: velerov1.SchemeGroupVersion.String(),
Kind: "Backup",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-backup",
UID: "fake-uid",
},
}
backupPodUrecoverable := &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: backup.Namespace,
Name: backup.Name,
},
Status: corev1api.PodStatus{
Phase: corev1api.PodFailed,
},
}
backupPod := &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: backup.Namespace,
Name: backup.Name,
},
}
scheme := runtime.NewScheme()
corev1api.AddToScheme(scheme)
tests := []struct {
name string
kubeClientObj []runtime.Object
ownerBackup *velerov1.Backup
err string
}{
{
name: "backup pod is not found",
ownerBackup: backup,
},
{
name: "pod is unrecoverable",
ownerBackup: backup,
kubeClientObj: []runtime.Object{
backupPodUrecoverable,
},
err: "Pod is in abnormal state [Failed], message []",
},
{
name: "succeed",
ownerBackup: backup,
kubeClientObj: []runtime.Object{
backupPod,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
exposer := podVolumeExposer{
kubeClient: fakeKubeClient,
log: velerotest.NewLogger(),
}
var ownerObject corev1api.ObjectReference
if test.ownerBackup != nil {
ownerObject = corev1api.ObjectReference{
Kind: test.ownerBackup.Kind,
Namespace: test.ownerBackup.Namespace,
Name: test.ownerBackup.Name,
UID: test.ownerBackup.UID,
APIVersion: test.ownerBackup.APIVersion,
}
}
err := exposer.PeekExposed(context.Background(), ownerObject)
if test.err == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, test.err)
}
})
}
}
func TestPodVolumeDiagnoseExpose(t *testing.T) {
backup := &velerov1.Backup{
TypeMeta: metav1.TypeMeta{
APIVersion: velerov1.SchemeGroupVersion.String(),
Kind: "Backup",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-backup",
UID: "fake-uid",
},
}
backupPodWithoutNodeName := corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-backup",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: backup.APIVersion,
Kind: backup.Kind,
Name: backup.Name,
UID: backup.UID,
},
},
},
Status: corev1api.PodStatus{
Phase: corev1api.PodPending,
Conditions: []corev1api.PodCondition{
{
Type: corev1api.PodInitialized,
Status: corev1api.ConditionTrue,
Message: "fake-pod-message",
},
},
},
}
backupPodWithNodeName := corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-backup",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: backup.APIVersion,
Kind: backup.Kind,
Name: backup.Name,
UID: backup.UID,
},
},
},
Spec: corev1api.PodSpec{
NodeName: "fake-node",
},
Status: corev1api.PodStatus{
Phase: corev1api.PodPending,
Conditions: []corev1api.PodCondition{
{
Type: corev1api.PodInitialized,
Status: corev1api.ConditionTrue,
Message: "fake-pod-message",
},
},
},
}
nodeAgentPod := corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "node-agent-pod-1",
Labels: map[string]string{"role": "node-agent"},
},
Spec: corev1api.PodSpec{
NodeName: "fake-node",
},
Status: corev1api.PodStatus{
Phase: corev1api.PodRunning,
},
}
tests := []struct {
name string
ownerBackup *velerov1.Backup
kubeClientObj []runtime.Object
snapshotClientObj []runtime.Object
expected string
}{
{
name: "no pod",
ownerBackup: backup,
expected: `begin diagnose pod volume exposer
error getting hosting pod fake-backup, err: pods "fake-backup" not found
end diagnose pod volume exposer`,
},
{
name: "pod without node name, pvc without volume name, vs without status",
ownerBackup: backup,
kubeClientObj: []runtime.Object{
&backupPodWithoutNodeName,
},
expected: `begin diagnose pod volume exposer
Pod velero/fake-backup, phase Pending, node name
Pod condition Initialized, status True, reason , message fake-pod-message
end diagnose pod volume exposer`,
},
{
name: "pod without node name",
ownerBackup: backup,
kubeClientObj: []runtime.Object{
&backupPodWithoutNodeName,
},
expected: `begin diagnose pod volume exposer
Pod velero/fake-backup, phase Pending, node name
Pod condition Initialized, status True, reason , message fake-pod-message
end diagnose pod volume exposer`,
},
{
name: "pod with node name, no node agent",
ownerBackup: backup,
kubeClientObj: []runtime.Object{
&backupPodWithNodeName,
},
expected: `begin diagnose pod volume exposer
Pod velero/fake-backup, phase Pending, node name fake-node
Pod condition Initialized, status True, reason , message fake-pod-message
node-agent is not running in node fake-node, err: daemonset pod not found in running state in node fake-node
end diagnose pod volume exposer`,
},
{
name: "pod with node name, node agent is running",
ownerBackup: backup,
kubeClientObj: []runtime.Object{
&backupPodWithNodeName,
&nodeAgentPod,
},
expected: `begin diagnose pod volume exposer
Pod velero/fake-backup, phase Pending, node name fake-node
Pod condition Initialized, status True, reason , message fake-pod-message
end diagnose pod volume exposer`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(tt.kubeClientObj...)
e := &podVolumeExposer{
kubeClient: fakeKubeClient,
log: velerotest.NewLogger(),
}
var ownerObject corev1api.ObjectReference
if tt.ownerBackup != nil {
ownerObject = corev1api.ObjectReference{
Kind: tt.ownerBackup.Kind,
Namespace: tt.ownerBackup.Namespace,
Name: tt.ownerBackup.Name,
UID: tt.ownerBackup.UID,
APIVersion: tt.ownerBackup.APIVersion,
}
}
diag := e.DiagnoseExpose(context.Background(), ownerObject)
assert.Equal(t, tt.expected, diag)
})
}
}

View File

@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/vmware-tanzu/velero/internal/velero"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
)
func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1api.DaemonSet {
@ -126,8 +127,8 @@ func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1api.DaemonSet
},
VolumeMounts: []corev1api.VolumeMount{
{
Name: "host-pods",
MountPath: "/host_pods",
Name: nodeagent.HostPodVolumeMount,
MountPath: nodeagent.HostPodVolumeMountPath(),
MountPropagation: &mountPropagationMode,
},
{

View File

@ -41,6 +41,12 @@ const (
// nodeAgentRole marks pods with node-agent role on all nodes.
nodeAgentRole = "node-agent"
// HostPodVolumeMount is the name of the volume in node-agent for host-pod mount
HostPodVolumeMount = "host-pods"
// HostPodVolumeMountPoint is the mount point of the volume in node-agent for host-pod mount
HostPodVolumeMountPoint = "host_pods"
)
var (
@ -249,3 +255,45 @@ func GetAnnotationValue(ctx context.Context, kubeClient kubernetes.Interface, na
return val, nil
}
func GetHostPodPath(ctx context.Context, kubeClient kubernetes.Interface, namespace string, osType string) (string, error) {
dsName := daemonSet
if osType == kube.NodeOSWindows {
dsName = daemonsetWindows
}
ds, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, dsName, metav1.GetOptions{})
if err != nil {
return "", errors.Wrapf(err, "error getting daemonset %s", dsName)
}
var volume *corev1api.Volume
for _, v := range ds.Spec.Template.Spec.Volumes {
if v.Name == HostPodVolumeMount {
volume = &v
break
}
}
if volume == nil {
return "", errors.New("host pod volume is not found")
}
if volume.HostPath == nil {
return "", errors.New("host pod volume is not a host path volume")
}
if volume.HostPath.Path == "" {
return "", errors.New("host pod volume path is empty")
}
return volume.HostPath.Path, nil
}
func HostPodVolumeMountPath() string {
return "/" + HostPodVolumeMountPoint
}
func HostPodVolumeMountPathWin() string {
return "\\" + HostPodVolumeMountPoint
}

View File

@ -590,3 +590,164 @@ func TestGetAnnotationValue(t *testing.T) {
})
}
}
func TestGetHostPodPath(t *testing.T) {
daemonSet := &appsv1api.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "node-agent",
},
TypeMeta: metav1.TypeMeta{
Kind: "DaemonSet",
},
}
daemonSetWithHostPodVolume := &appsv1api.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "node-agent",
},
TypeMeta: metav1.TypeMeta{
Kind: "DaemonSet",
},
Spec: appsv1api.DaemonSetSpec{
Template: corev1api.PodTemplateSpec{
Spec: corev1api.PodSpec{
Volumes: []corev1api.Volume{
{
Name: HostPodVolumeMount,
},
},
},
},
},
}
daemonSetWithHostPodVolumeAndEmptyPath := &appsv1api.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "node-agent",
},
TypeMeta: metav1.TypeMeta{
Kind: "DaemonSet",
},
Spec: appsv1api.DaemonSetSpec{
Template: corev1api.PodTemplateSpec{
Spec: corev1api.PodSpec{
Volumes: []corev1api.Volume{
{
Name: HostPodVolumeMount,
VolumeSource: corev1api.VolumeSource{
HostPath: &corev1api.HostPathVolumeSource{},
},
},
},
},
},
},
}
daemonSetWithHostPodVolumeAndValidPath := &appsv1api.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "node-agent",
},
TypeMeta: metav1.TypeMeta{
Kind: "DaemonSet",
},
Spec: appsv1api.DaemonSetSpec{
Template: corev1api.PodTemplateSpec{
Spec: corev1api.PodSpec{
Volumes: []corev1api.Volume{
{
Name: HostPodVolumeMount,
VolumeSource: corev1api.VolumeSource{
HostPath: &corev1api.HostPathVolumeSource{
Path: "/var/lib/kubelet/pods",
},
},
},
},
},
},
},
}
tests := []struct {
name string
kubeClientObj []runtime.Object
namespace string
osType string
expectedValue string
expectErr string
}{
{
name: "ds get error",
namespace: "fake-ns",
osType: kube.NodeOSWindows,
kubeClientObj: []runtime.Object{
daemonSet,
},
expectErr: "error getting daemonset node-agent-windows: daemonsets.apps \"node-agent-windows\" not found",
},
{
name: "no host pod volume",
namespace: "fake-ns",
osType: kube.NodeOSLinux,
kubeClientObj: []runtime.Object{
daemonSet,
},
expectErr: "host pod volume is not found",
},
{
name: "no host pod volume path",
namespace: "fake-ns",
osType: kube.NodeOSLinux,
kubeClientObj: []runtime.Object{
daemonSetWithHostPodVolume,
},
expectErr: "host pod volume is not a host path volume",
},
{
name: "empty host pod volume path",
namespace: "fake-ns",
osType: kube.NodeOSLinux,
kubeClientObj: []runtime.Object{
daemonSetWithHostPodVolumeAndEmptyPath,
},
expectErr: "host pod volume path is empty",
},
{
name: "succeed",
namespace: "fake-ns",
osType: kube.NodeOSLinux,
kubeClientObj: []runtime.Object{
daemonSetWithHostPodVolumeAndValidPath,
},
expectedValue: "/var/lib/kubelet/pods",
},
{
name: "succeed on empty os type",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
daemonSetWithHostPodVolumeAndValidPath,
},
expectedValue: "/var/lib/kubelet/pods",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
path, err := GetHostPodPath(context.TODO(), fakeKubeClient, test.namespace, test.osType)
if test.expectErr == "" {
assert.NoError(t, err)
assert.Equal(t, test.expectedValue, path)
} else {
assert.EqualError(t, err, test.expectErr)
}
})
}
}

View File

@ -25,7 +25,6 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1"
storagev1api "k8s.io/api/storage/v1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -33,8 +32,8 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
@ -148,8 +147,8 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core
// GetVolumeDirectory gets the name of the directory on the host, under /var/lib/kubelet/pods/<podUID>/volumes/,
// where the specified volume lives.
// For volumes with a CSIVolumeSource, append "/mount" to the directory name.
func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (string, error) {
pvc, pv, volume, err := GetPodPVCVolume(ctx, log, pod, volumeName, cli)
func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, kubeClient kubernetes.Interface) (string, error) {
pvc, pv, volume, err := GetPodPVCVolume(ctx, log, pod, volumeName, kubeClient)
if err != nil {
// This case implies the administrator created the PV and attached it directly, without PVC.
// Note that only one VolumeSource can be populated per Volume on a pod
@ -164,7 +163,7 @@ func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1
// Most common case is that we have a PVC VolumeSource, and we need to check the PV it points to for a CSI source.
// PV's been created with a CSI source.
isProvisionedByCSI, err := isProvisionedByCSI(log, pv, cli)
isProvisionedByCSI, err := isProvisionedByCSI(log, pv, kubeClient)
if err != nil {
return "", errors.WithStack(err)
}
@ -179,9 +178,9 @@ func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1
}
// GetVolumeMode gets the uploader.PersistentVolumeMode of the volume.
func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (
func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, kubeClient kubernetes.Interface) (
uploader.PersistentVolumeMode, error) {
_, pv, _, err := GetPodPVCVolume(ctx, log, pod, volumeName, cli)
_, pv, _, err := GetPodPVCVolume(ctx, log, pod, volumeName, kubeClient)
if err != nil {
if err == ErrorPodVolumeIsNotPVC {
@ -198,7 +197,7 @@ func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.P
// GetPodPVCVolume gets the PVC, PV and volume for a pod volume name.
// Returns pod volume in case of ErrorPodVolumeIsNotPVC error
func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (
func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, kubeClient kubernetes.Interface) (
*corev1api.PersistentVolumeClaim, *corev1api.PersistentVolume, *corev1api.Volume, error) {
var volume *corev1api.Volume
@ -217,14 +216,12 @@ func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api
return nil, nil, volume, ErrorPodVolumeIsNotPVC // There is a pod volume but it is not a PVC
}
pvc := &corev1api.PersistentVolumeClaim{}
err := cli.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: volume.VolumeSource.PersistentVolumeClaim.ClaimName}, pvc)
pvc, err := kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(ctx, volume.VolumeSource.PersistentVolumeClaim.ClaimName, metav1.GetOptions{})
if err != nil {
return nil, nil, nil, errors.WithStack(err)
}
pv := &corev1api.PersistentVolume{}
err = cli.Get(ctx, client.ObjectKey{Name: pvc.Spec.VolumeName}, pv)
pv, err := kubeClient.CoreV1().PersistentVolumes().Get(ctx, pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
return nil, nil, nil, errors.WithStack(err)
}
@ -235,7 +232,7 @@ func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api
// isProvisionedByCSI function checks whether this is a CSI PV by annotation.
// Either "pv.kubernetes.io/provisioned-by" or "pv.kubernetes.io/migrated-to" indicates
// PV is provisioned by CSI.
func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume, kbClient client.Client) (bool, error) {
func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume, kubeClient kubernetes.Interface) (bool, error) {
if pv.Spec.CSI != nil {
return true, nil
}
@ -245,10 +242,11 @@ func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume,
driverName := pv.Annotations[KubeAnnDynamicallyProvisioned]
migratedDriver := pv.Annotations[KubeAnnMigratedTo]
if len(driverName) > 0 || len(migratedDriver) > 0 {
list := &storagev1api.CSIDriverList{}
if err := kbClient.List(context.TODO(), list); err != nil {
list, err := kubeClient.StorageV1().CSIDrivers().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return false, err
}
for _, driver := range list.Items {
if driverName == driver.Name || migratedDriver == driver.Name {
log.Debugf("the annotation %s or %s equals to %s indicates the volume is provisioned by a CSI driver", KubeAnnDynamicallyProvisioned, KubeAnnMigratedTo, driver.Name)

View File

@ -34,11 +34,12 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/vmware-tanzu/velero/pkg/builder"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
"k8s.io/client-go/kubernetes/fake"
)
func TestNamespaceAndName(t *testing.T) {
@ -216,17 +217,18 @@ func TestGetVolumeDirectorySuccess(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "csi.test.com"},
}
for _, tc := range tests {
clientBuilder := fake.NewClientBuilder().WithLists(&storagev1api.CSIDriverList{Items: []storagev1api.CSIDriver{csiDriver}})
objs := []runtime.Object{&csiDriver}
if tc.pvc != nil {
clientBuilder = clientBuilder.WithObjects(tc.pvc)
objs = append(objs, tc.pvc)
}
if tc.pv != nil {
clientBuilder = clientBuilder.WithObjects(tc.pv)
objs = append(objs, tc.pv)
}
fakeKubeClient := fake.NewSimpleClientset(objs...)
// Function under test
dir, err := GetVolumeDirectory(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, clientBuilder.Build())
dir, err := GetVolumeDirectory(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, fakeKubeClient)
require.NoError(t, err)
assert.Equal(t, tc.want, dir)
@ -264,17 +266,18 @@ func TestGetVolumeModeSuccess(t *testing.T) {
}
for _, tc := range tests {
clientBuilder := fake.NewClientBuilder()
objs := []runtime.Object{}
if tc.pvc != nil {
clientBuilder = clientBuilder.WithObjects(tc.pvc)
objs = append(objs, tc.pvc)
}
if tc.pv != nil {
clientBuilder = clientBuilder.WithObjects(tc.pv)
objs = append(objs, tc.pv)
}
fakeKubeClient := fake.NewSimpleClientset(objs...)
// Function under test
mode, err := GetVolumeMode(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, clientBuilder.Build())
mode, err := GetVolumeMode(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, fakeKubeClient)
require.NoError(t, err)
assert.Equal(t, tc.want, mode)