Merge branch 'main' into vgdp-ms-pvr-data-path
commit
73e1c8ae4a
|
@ -0,0 +1 @@
|
|||
Fix issue #8960, implement PodVolume exposer for PVB/PVR
|
|
@ -76,8 +76,6 @@ const (
|
|||
// the port where prometheus metrics are exposed
|
||||
defaultMetricsAddress = ":8085"
|
||||
|
||||
defaultHostPodsPath = "/host_pods"
|
||||
|
||||
defaultResourceTimeout = 10 * time.Minute
|
||||
defaultDataMoverPrepareTimeout = 30 * time.Minute
|
||||
defaultDataPathConcurrentNum = 1
|
||||
|
@ -301,14 +299,14 @@ func (s *nodeAgentServer) run() {
|
|||
|
||||
credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
|
||||
repoEnsurer := repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout)
|
||||
pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer,
|
||||
pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer,
|
||||
credentialGetter, s.nodeName, s.mgr.GetScheme(), s.metrics, s.logger)
|
||||
|
||||
if err := pvbReconciler.SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerPodVolumeBackup)
|
||||
}
|
||||
|
||||
if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil {
|
||||
if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
|
||||
}
|
||||
|
||||
|
@ -412,10 +410,10 @@ func (s *nodeAgentServer) waitCacheForResume() error {
|
|||
// validatePodVolumesHostPath validates that the pod volumes path contains a
|
||||
// directory for each Pod running on this node
|
||||
func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface) error {
|
||||
files, err := s.fileSystem.ReadDir(defaultHostPodsPath)
|
||||
files, err := s.fileSystem.ReadDir(nodeagent.HostPodVolumeMountPath())
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
s.logger.Warnf("Pod volumes host path [%s] doesn't exist, fs-backup is disabled", defaultHostPodsPath)
|
||||
s.logger.Warnf("Pod volumes host path [%s] doesn't exist, fs-backup is disabled", nodeagent.HostPodVolumeMountPath())
|
||||
return nil
|
||||
}
|
||||
return errors.Wrap(err, "could not read pod volumes host path")
|
||||
|
@ -448,7 +446,7 @@ func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface
|
|||
valid = false
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"pod": fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName()),
|
||||
"path": defaultHostPodsPath + "/" + dirName,
|
||||
"path": nodeagent.HostPodVolumeMountPath() + "/" + dirName,
|
||||
}).Debug("could not find volumes for pod in host path")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -99,7 +99,7 @@ func Test_validatePodVolumesHostPath(t *testing.T) {
|
|||
|
||||
for _, dir := range tt.dirs {
|
||||
if tt.createDir {
|
||||
err := fs.MkdirAll(filepath.Join(defaultHostPodsPath, dir), os.ModePerm)
|
||||
err := fs.MkdirAll(filepath.Join(nodeagent.HostPodVolumeMountPath(), dir), os.ModePerm)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
clocks "k8s.io/utils/clock"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
@ -48,10 +49,11 @@ import (
|
|||
const pVBRRequestor string = "pod-volume-backup-restore"
|
||||
|
||||
// NewPodVolumeBackupReconciler creates the PodVolumeBackupReconciler instance
|
||||
func NewPodVolumeBackupReconciler(client client.Client, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter,
|
||||
func NewPodVolumeBackupReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter,
|
||||
nodeName string, scheme *runtime.Scheme, metrics *metrics.ServerMetrics, logger logrus.FieldLogger) *PodVolumeBackupReconciler {
|
||||
return &PodVolumeBackupReconciler{
|
||||
Client: client,
|
||||
kubeClient: kubeClient,
|
||||
logger: logger.WithField("controller", "PodVolumeBackup"),
|
||||
repositoryEnsurer: ensurer,
|
||||
credentialGetter: credentialGetter,
|
||||
|
@ -67,6 +69,7 @@ func NewPodVolumeBackupReconciler(client client.Client, dataPathMgr *datapath.Ma
|
|||
// PodVolumeBackupReconciler reconciles a PodVolumeBackup object
|
||||
type PodVolumeBackupReconciler struct {
|
||||
client.Client
|
||||
kubeClient kubernetes.Interface
|
||||
scheme *runtime.Scheme
|
||||
clock clocks.WithTickerAndDelayedExecution
|
||||
metrics *metrics.ServerMetrics
|
||||
|
@ -155,7 +158,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
|||
return r.errorOut(ctx, &pvb, err, fmt.Sprintf("getting pod %s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name), log)
|
||||
}
|
||||
|
||||
path, err := exposer.GetPodVolumeHostPath(ctx, &pod, pvb.Spec.Volume, r.Client, r.fileSystem, log)
|
||||
path, err := exposer.GetPodVolumeHostPath(ctx, &pod, pvb.Spec.Volume, r.kubeClient, r.fileSystem, log)
|
||||
if err != nil {
|
||||
r.closeDataPath(ctx, pvb.Name)
|
||||
return r.errorOut(ctx, &pvb, err, "error exposing host path for pod volume", log)
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
clocks "k8s.io/utils/clock"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
@ -49,10 +50,11 @@ import (
|
|||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
)
|
||||
|
||||
func NewPodVolumeRestoreReconciler(client client.Client, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer,
|
||||
func NewPodVolumeRestoreReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer,
|
||||
credentialGetter *credentials.CredentialGetter, logger logrus.FieldLogger) *PodVolumeRestoreReconciler {
|
||||
return &PodVolumeRestoreReconciler{
|
||||
Client: client,
|
||||
kubeClient: kubeClient,
|
||||
logger: logger.WithField("controller", "PodVolumeRestore"),
|
||||
repositoryEnsurer: ensurer,
|
||||
credentialGetter: credentialGetter,
|
||||
|
@ -64,6 +66,7 @@ func NewPodVolumeRestoreReconciler(client client.Client, dataPathMgr *datapath.M
|
|||
|
||||
type PodVolumeRestoreReconciler struct {
|
||||
client.Client
|
||||
kubeClient kubernetes.Interface
|
||||
logger logrus.FieldLogger
|
||||
repositoryEnsurer *repository.Ensurer
|
||||
credentialGetter *credentials.CredentialGetter
|
||||
|
@ -135,7 +138,7 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req
|
|||
return c.errorOut(ctx, pvr, err, "error to update status to in progress", log)
|
||||
}
|
||||
|
||||
volumePath, err := exposer.GetPodVolumeHostPath(ctx, pod, pvr.Spec.Volume, c.Client, c.fileSystem, log)
|
||||
volumePath, err := exposer.GetPodVolumeHostPath(ctx, pod, pvr.Spec.Volume, c.kubeClient, c.fileSystem, log)
|
||||
if err != nil {
|
||||
c.closeDataPath(ctx, pvr.Name)
|
||||
return c.errorOut(ctx, pvr, err, "error exposing host path for pod volume", log)
|
||||
|
|
|
@ -19,13 +19,15 @@ package exposer
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||
"github.com/vmware-tanzu/velero/pkg/nodeagent"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
|
@ -37,17 +39,17 @@ var singlePathMatch = kube.SinglePathMatch
|
|||
|
||||
// GetPodVolumeHostPath returns a path that can be accessed from the host for a given volume of a pod
|
||||
func GetPodVolumeHostPath(ctx context.Context, pod *corev1api.Pod, volumeName string,
|
||||
cli ctrlclient.Client, fs filesystem.Interface, log logrus.FieldLogger) (datapath.AccessPoint, error) {
|
||||
kubeClient kubernetes.Interface, fs filesystem.Interface, log logrus.FieldLogger) (datapath.AccessPoint, error) {
|
||||
logger := log.WithField("pod name", pod.Name).WithField("pod UID", pod.GetUID()).WithField("volume", volumeName)
|
||||
|
||||
volDir, err := getVolumeDirectory(ctx, logger, pod, volumeName, cli)
|
||||
volDir, err := getVolumeDirectory(ctx, logger, pod, volumeName, kubeClient)
|
||||
if err != nil {
|
||||
return datapath.AccessPoint{}, errors.Wrapf(err, "error getting volume directory name for volume %s in pod %s", volumeName, pod.Name)
|
||||
}
|
||||
|
||||
logger.WithField("volDir", volDir).Info("Got volume dir")
|
||||
|
||||
volMode, err := getVolumeMode(ctx, logger, pod, volumeName, cli)
|
||||
volMode, err := getVolumeMode(ctx, logger, pod, volumeName, kubeClient)
|
||||
if err != nil {
|
||||
return datapath.AccessPoint{}, errors.Wrapf(err, "error getting volume mode for volume %s in pod %s", volumeName, pod.Name)
|
||||
}
|
||||
|
@ -57,7 +59,7 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1api.Pod, volumeName st
|
|||
volSubDir = "volumeDevices"
|
||||
}
|
||||
|
||||
pathGlob := fmt.Sprintf("/host_pods/%s/%s/*/%s", string(pod.GetUID()), volSubDir, volDir)
|
||||
pathGlob := fmt.Sprintf("%s/%s/%s/*/%s", nodeagent.HostPodVolumeMountPath(), string(pod.GetUID()), volSubDir, volDir)
|
||||
logger.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob")
|
||||
|
||||
path, err := singlePathMatch(pathGlob, fs, logger)
|
||||
|
@ -72,3 +74,22 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1api.Pod, volumeName st
|
|||
VolMode: volMode,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var getHostPodPath = nodeagent.GetHostPodPath
|
||||
|
||||
func ExtractPodVolumeHostPath(ctx context.Context, path string, kubeClient kubernetes.Interface, veleroNamespace string, osType string) (string, error) {
|
||||
podPath, err := getHostPodPath(ctx, kubeClient, veleroNamespace, osType)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "error getting host pod path from node-agent")
|
||||
}
|
||||
|
||||
if osType == kube.NodeOSWindows {
|
||||
podPath = strings.Replace(podPath, "/", "\\", -1)
|
||||
}
|
||||
|
||||
if osType == kube.NodeOSWindows {
|
||||
return strings.Replace(path, nodeagent.HostPodVolumeMountPathWin(), podPath, 1), nil
|
||||
} else {
|
||||
return strings.Replace(path, nodeagent.HostPodVolumeMountPath(), podPath, 1), nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,26 +18,29 @@ package exposer
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/nodeagent"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
|
||||
func TestGetPodVolumeHostPath(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
getVolumeDirFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (string, error)
|
||||
getVolumeModeFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (uploader.PersistentVolumeMode, error)
|
||||
getVolumeDirFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (string, error)
|
||||
getVolumeModeFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (uploader.PersistentVolumeMode, error)
|
||||
pathMatchFunc func(string, filesystem.Interface, logrus.FieldLogger) (string, error)
|
||||
pod *corev1api.Pod
|
||||
pvc string
|
||||
|
@ -45,7 +48,7 @@ func TestGetPodVolumeHostPath(t *testing.T) {
|
|||
}{
|
||||
{
|
||||
name: "get volume dir fail",
|
||||
getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (string, error) {
|
||||
getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (string, error) {
|
||||
return "", errors.New("fake-error-1")
|
||||
},
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, "fake-pod-1").Result(),
|
||||
|
@ -54,10 +57,10 @@ func TestGetPodVolumeHostPath(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "single path match fail",
|
||||
getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (string, error) {
|
||||
getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (string, error) {
|
||||
return "", nil
|
||||
},
|
||||
getVolumeModeFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (uploader.PersistentVolumeMode, error) {
|
||||
getVolumeModeFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (uploader.PersistentVolumeMode, error) {
|
||||
return uploader.PersistentVolumeFilesystem, nil
|
||||
},
|
||||
pathMatchFunc: func(string, filesystem.Interface, logrus.FieldLogger) (string, error) {
|
||||
|
@ -69,7 +72,7 @@ func TestGetPodVolumeHostPath(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "get block volume dir success",
|
||||
getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (
|
||||
getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (
|
||||
string, error) {
|
||||
return "fake-pvc-1", nil
|
||||
},
|
||||
|
@ -102,3 +105,58 @@ func TestGetPodVolumeHostPath(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractPodVolumeHostPath(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
getHostPodPathFunc func(context.Context, kubernetes.Interface, string, string) (string, error)
|
||||
path string
|
||||
osType string
|
||||
expectedErr string
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "get host pod path error",
|
||||
getHostPodPathFunc: func(context.Context, kubernetes.Interface, string, string) (string, error) {
|
||||
return "", errors.New("fake-error-1")
|
||||
},
|
||||
|
||||
expectedErr: "error getting host pod path from node-agent: fake-error-1",
|
||||
},
|
||||
{
|
||||
name: "Windows os",
|
||||
getHostPodPathFunc: func(context.Context, kubernetes.Interface, string, string) (string, error) {
|
||||
return "/var/lib/kubelet/pods", nil
|
||||
},
|
||||
path: fmt.Sprintf("\\%s\\pod-id-xxx\\volumes\\kubernetes.io~csi\\pvc-id-xxx\\mount", nodeagent.HostPodVolumeMountPoint),
|
||||
osType: kube.NodeOSWindows,
|
||||
expected: "\\var\\lib\\kubelet\\pods\\pod-id-xxx\\volumes\\kubernetes.io~csi\\pvc-id-xxx\\mount",
|
||||
},
|
||||
{
|
||||
name: "linux OS",
|
||||
getHostPodPathFunc: func(context.Context, kubernetes.Interface, string, string) (string, error) {
|
||||
return "/var/lib/kubelet/pods", nil
|
||||
},
|
||||
path: fmt.Sprintf("/%s/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nodeagent.HostPodVolumeMountPoint),
|
||||
osType: kube.NodeOSLinux,
|
||||
expected: "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
if test.getHostPodPathFunc != nil {
|
||||
getHostPodPath = test.getHostPodPathFunc
|
||||
}
|
||||
|
||||
path, err := ExtractPodVolumeHostPath(context.Background(), test.path, nil, "", test.osType)
|
||||
|
||||
if test.expectedErr != "" {
|
||||
assert.EqualError(t, err, test.expectedErr)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, test.expected, path)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,392 @@
|
|||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package exposer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/nodeagent"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
|
||||
const (
|
||||
PodVolumeExposeTypeBackup = "pod-volume-backup"
|
||||
PodVolumeExposeTypeRestore = "pod-volume-restore"
|
||||
)
|
||||
|
||||
// PodVolumeExposeParam define the input param for pod volume Expose
|
||||
type PodVolumeExposeParam struct {
|
||||
// ClientPodName is the name of pod to be backed up or restored
|
||||
ClientPodName string
|
||||
|
||||
// ClientNamespace is the namespace to be backed up or restored
|
||||
ClientNamespace string
|
||||
|
||||
// ClientNamespace is the pod volume for the client PVC
|
||||
ClientPodVolume string
|
||||
|
||||
// HostingPodLabels is the labels that are going to apply to the hosting pod
|
||||
HostingPodLabels map[string]string
|
||||
|
||||
// HostingPodAnnotations is the annotations that are going to apply to the hosting pod
|
||||
HostingPodAnnotations map[string]string
|
||||
|
||||
// Resources defines the resource requirements of the hosting pod
|
||||
Resources corev1api.ResourceRequirements
|
||||
|
||||
// OperationTimeout specifies the time wait for resources operations in Expose
|
||||
OperationTimeout time.Duration
|
||||
|
||||
// Type specifies the type of the expose, either backup or erstore
|
||||
Type string
|
||||
}
|
||||
|
||||
// PodVolumeExposer is the interfaces for a pod volume exposer
|
||||
type PodVolumeExposer interface {
|
||||
// Expose starts the process to a pod volume expose, the expose process may take long time
|
||||
Expose(context.Context, corev1api.ObjectReference, PodVolumeExposeParam) error
|
||||
|
||||
// GetExposed polls the status of the expose.
|
||||
// If the expose is accessible by the current caller, it waits the expose ready and returns the expose result.
|
||||
// Otherwise, it returns nil as the expose result without an error.
|
||||
GetExposed(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error)
|
||||
|
||||
// PeekExposed tests the status of the expose.
|
||||
// If the expose is incomplete but not recoverable, it returns an error.
|
||||
// Otherwise, it returns nil immediately.
|
||||
PeekExposed(context.Context, corev1api.ObjectReference) error
|
||||
|
||||
// DiagnoseExpose generate the diagnostic info when the expose is not finished for a long time.
|
||||
// If it finds any problem, it returns an string about the problem.
|
||||
DiagnoseExpose(context.Context, corev1api.ObjectReference) string
|
||||
|
||||
// CleanUp cleans up any objects generated during the restore expose
|
||||
CleanUp(context.Context, corev1api.ObjectReference)
|
||||
}
|
||||
|
||||
// NewPodVolumeExposer creates a new instance of pod volume exposer
|
||||
func NewPodVolumeExposer(kubeClient kubernetes.Interface, log logrus.FieldLogger) PodVolumeExposer {
|
||||
return &podVolumeExposer{
|
||||
kubeClient: kubeClient,
|
||||
fs: filesystem.NewFileSystem(),
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
type podVolumeExposer struct {
|
||||
kubeClient kubernetes.Interface
|
||||
fs filesystem.Interface
|
||||
log logrus.FieldLogger
|
||||
}
|
||||
|
||||
var getPodVolumeHostPath = GetPodVolumeHostPath
|
||||
var extractPodVolumeHostPath = ExtractPodVolumeHostPath
|
||||
|
||||
func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1api.ObjectReference, param PodVolumeExposeParam) error {
|
||||
curLog := e.log.WithFields(logrus.Fields{
|
||||
"owner": ownerObject.Name,
|
||||
"client pod": param.ClientPodName,
|
||||
"client pod volume": param.ClientPodVolume,
|
||||
"client namespace": param.ClientNamespace,
|
||||
"type": param.Type,
|
||||
})
|
||||
|
||||
pod, err := e.kubeClient.CoreV1().Pods(param.ClientNamespace).Get(ctx, param.ClientPodName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error getting client pod %s", param.ClientPodName)
|
||||
}
|
||||
|
||||
if pod.Spec.NodeName == "" {
|
||||
return errors.Errorf("client pod %s doesn't have a node name", pod.Name)
|
||||
}
|
||||
|
||||
nodeOS, err := kube.GetNodeOS(ctx, pod.Spec.NodeName, e.kubeClient.CoreV1())
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error getting OS for node %s", pod.Spec.NodeName)
|
||||
}
|
||||
|
||||
curLog.Infof("Client pod is running in node %s, os %s", pod.Spec.NodeName, nodeOS)
|
||||
|
||||
path, err := getPodVolumeHostPath(ctx, pod, param.ClientPodVolume, e.kubeClient, e.fs, e.log)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error to get pod volume path")
|
||||
}
|
||||
|
||||
path.ByPath, err = extractPodVolumeHostPath(ctx, path.ByPath, e.kubeClient, ownerObject.Namespace, nodeOS)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error to extract pod volume path")
|
||||
}
|
||||
|
||||
curLog.WithField("path", path).Infof("Host path is retrieved for pod %s, volume %s", param.ClientPodName, param.ClientPodVolume)
|
||||
|
||||
hostingPod, err := e.createHostingPod(ctx, ownerObject, param.Type, path.ByPath, param.OperationTimeout, param.HostingPodLabels, param.HostingPodAnnotations, pod.Spec.NodeName, param.Resources, nodeOS)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error to create hosting pod")
|
||||
}
|
||||
|
||||
curLog.WithField("pod name", hostingPod.Name).Info("Hosting pod is created")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1api.ObjectReference, nodeClient client.Client, nodeName string, timeout time.Duration) (*ExposeResult, error) {
|
||||
hostingPodName := ownerObject.Name
|
||||
|
||||
containerName := string(ownerObject.UID)
|
||||
volumeName := string(ownerObject.UID)
|
||||
|
||||
curLog := e.log.WithFields(logrus.Fields{
|
||||
"owner": ownerObject.Name,
|
||||
"node": nodeName,
|
||||
})
|
||||
|
||||
var updated *corev1api.Pod
|
||||
err := wait.PollUntilContextTimeout(ctx, 2*time.Second, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
pod := &corev1api.Pod{}
|
||||
err := nodeClient.Get(ctx, types.NamespacedName{
|
||||
Namespace: ownerObject.Namespace,
|
||||
Name: hostingPodName,
|
||||
}, pod)
|
||||
|
||||
if err != nil {
|
||||
return false, errors.Wrapf(err, "error to get pod %s/%s", ownerObject.Namespace, hostingPodName)
|
||||
}
|
||||
|
||||
if pod.Status.Phase != corev1api.PodRunning {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
updated = pod
|
||||
|
||||
return true, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
curLog.WithField("hosting pod", hostingPodName).Debug("Hosting pod is not running in the current node")
|
||||
return nil, nil
|
||||
} else {
|
||||
return nil, errors.Wrapf(err, "error to wait for rediness of pod %s", hostingPodName)
|
||||
}
|
||||
}
|
||||
|
||||
curLog.WithField("pod", updated.Name).Infof("Hosting pod is in running state in node %s", updated.Spec.NodeName)
|
||||
|
||||
return &ExposeResult{ByPod: ExposeByPod{
|
||||
HostingPod: updated,
|
||||
HostingContainer: containerName,
|
||||
VolumeName: volumeName,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func (e *podVolumeExposer) PeekExposed(ctx context.Context, ownerObject corev1api.ObjectReference) error {
|
||||
hostingPodName := ownerObject.Name
|
||||
|
||||
curLog := e.log.WithFields(logrus.Fields{
|
||||
"owner": ownerObject.Name,
|
||||
})
|
||||
|
||||
pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, hostingPodName, metav1.GetOptions{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
curLog.WithError(err).Warnf("error to peek hosting pod %s", hostingPodName)
|
||||
return nil
|
||||
}
|
||||
|
||||
if podFailed, message := kube.IsPodUnrecoverable(pod, curLog); podFailed {
|
||||
return errors.New(message)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev1api.ObjectReference) string {
|
||||
hostingPodName := ownerObject.Name
|
||||
|
||||
diag := "begin diagnose pod volume exposer\n"
|
||||
|
||||
pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, hostingPodName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
pod = nil
|
||||
diag += fmt.Sprintf("error getting hosting pod %s, err: %v\n", hostingPodName, err)
|
||||
}
|
||||
|
||||
if pod != nil {
|
||||
diag += kube.DiagnosePod(pod)
|
||||
|
||||
if pod.Spec.NodeName != "" {
|
||||
if err := nodeagent.KbClientIsRunningInNode(ctx, ownerObject.Namespace, pod.Spec.NodeName, e.kubeClient); err != nil {
|
||||
diag += fmt.Sprintf("node-agent is not running in node %s, err: %v\n", pod.Spec.NodeName, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
diag += "end diagnose pod volume exposer"
|
||||
|
||||
return diag
|
||||
}
|
||||
|
||||
func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) {
|
||||
restorePodName := ownerObject.Name
|
||||
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log)
|
||||
}
|
||||
|
||||
func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1api.ObjectReference, exposeType string, hostPath string,
|
||||
operationTimeout time.Duration, label map[string]string, annotation map[string]string, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string) (*corev1api.Pod, error) {
|
||||
hostingPodName := ownerObject.Name
|
||||
|
||||
containerName := string(ownerObject.UID)
|
||||
clientVolumeName := string(ownerObject.UID)
|
||||
clientVolumePath := "/" + clientVolumeName
|
||||
|
||||
podInfo, err := getInheritedPodInfo(ctx, e.kubeClient, ownerObject.Namespace, nodeOS)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error to get inherited pod info from node-agent")
|
||||
}
|
||||
|
||||
var gracePeriod int64
|
||||
mountPropagation := corev1api.MountPropagationHostToContainer
|
||||
volumeMounts := []corev1api.VolumeMount{{
|
||||
Name: clientVolumeName,
|
||||
MountPath: clientVolumePath,
|
||||
MountPropagation: &mountPropagation,
|
||||
}}
|
||||
volumeMounts = append(volumeMounts, podInfo.volumeMounts...)
|
||||
|
||||
volumes := []corev1api.Volume{{
|
||||
Name: clientVolumeName,
|
||||
VolumeSource: corev1api.VolumeSource{
|
||||
HostPath: &corev1api.HostPathVolumeSource{
|
||||
Path: hostPath,
|
||||
},
|
||||
},
|
||||
}}
|
||||
volumes = append(volumes, podInfo.volumes...)
|
||||
|
||||
args := []string{
|
||||
fmt.Sprintf("--volume-path=%s", clientVolumePath),
|
||||
fmt.Sprintf("--resource-timeout=%s", operationTimeout.String()),
|
||||
}
|
||||
|
||||
command := []string{
|
||||
"/velero",
|
||||
"pod-volume",
|
||||
}
|
||||
|
||||
if exposeType == PodVolumeExposeTypeBackup {
|
||||
args = append(args, fmt.Sprintf("--pod-volume-backup=%s", ownerObject.Name))
|
||||
command = append(command, "backup")
|
||||
} else {
|
||||
args = append(args, fmt.Sprintf("--pod-volume-restore=%s", ownerObject.Name))
|
||||
command = append(command, "restore")
|
||||
}
|
||||
|
||||
args = append(args, podInfo.logFormatArgs...)
|
||||
args = append(args, podInfo.logLevelArgs...)
|
||||
|
||||
var securityCtx *corev1api.PodSecurityContext
|
||||
nodeSelector := map[string]string{}
|
||||
podOS := corev1api.PodOS{}
|
||||
toleration := []corev1api.Toleration{}
|
||||
if nodeOS == kube.NodeOSWindows {
|
||||
userID := "ContainerAdministrator"
|
||||
securityCtx = &corev1api.PodSecurityContext{
|
||||
WindowsOptions: &corev1api.WindowsSecurityContextOptions{
|
||||
RunAsUserName: &userID,
|
||||
},
|
||||
}
|
||||
|
||||
nodeSelector[kube.NodeOSLabel] = kube.NodeOSWindows
|
||||
podOS.Name = kube.NodeOSWindows
|
||||
|
||||
toleration = append(toleration, corev1api.Toleration{
|
||||
Key: "os",
|
||||
Operator: "Equal",
|
||||
Effect: "NoSchedule",
|
||||
Value: "windows",
|
||||
})
|
||||
} else {
|
||||
userID := int64(0)
|
||||
securityCtx = &corev1api.PodSecurityContext{
|
||||
RunAsUser: &userID,
|
||||
}
|
||||
|
||||
nodeSelector[kube.NodeOSLabel] = kube.NodeOSLinux
|
||||
podOS.Name = kube.NodeOSLinux
|
||||
}
|
||||
|
||||
pod := &corev1api.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: hostingPodName,
|
||||
Namespace: ownerObject.Namespace,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: ownerObject.APIVersion,
|
||||
Kind: ownerObject.Kind,
|
||||
Name: ownerObject.Name,
|
||||
UID: ownerObject.UID,
|
||||
Controller: boolptr.True(),
|
||||
},
|
||||
},
|
||||
Labels: label,
|
||||
Annotations: annotation,
|
||||
},
|
||||
Spec: corev1api.PodSpec{
|
||||
NodeSelector: nodeSelector,
|
||||
OS: &podOS,
|
||||
Containers: []corev1api.Container{
|
||||
{
|
||||
Name: containerName,
|
||||
Image: podInfo.image,
|
||||
ImagePullPolicy: corev1api.PullNever,
|
||||
Command: command,
|
||||
Args: args,
|
||||
VolumeMounts: volumeMounts,
|
||||
Env: podInfo.env,
|
||||
EnvFrom: podInfo.envFrom,
|
||||
Resources: resources,
|
||||
},
|
||||
},
|
||||
ServiceAccountName: podInfo.serviceAccount,
|
||||
TerminationGracePeriodSeconds: &gracePeriod,
|
||||
Volumes: volumes,
|
||||
NodeName: selectedNode,
|
||||
RestartPolicy: corev1api.RestartPolicyNever,
|
||||
SecurityContext: securityCtx,
|
||||
Tolerations: toleration,
|
||||
},
|
||||
}
|
||||
|
||||
return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{})
|
||||
}
|
|
@ -0,0 +1,591 @@
|
|||
package exposer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
appsv1api "k8s.io/api/apps/v1"
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
)
|
||||
|
||||
func TestPodVolumeExpose(t *testing.T) {
|
||||
backup := &velerov1.Backup{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: velerov1.SchemeGroupVersion.String(),
|
||||
Kind: "Backup",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: velerov1.DefaultNamespace,
|
||||
Name: "fake-backup",
|
||||
UID: "fake-uid",
|
||||
},
|
||||
}
|
||||
|
||||
podWithNoNode := builder.ForPod("fake-ns", "fake-client-pod").Result()
|
||||
podWithNode := builder.ForPod("fake-ns", "fake-client-pod").NodeName("fake-node").Result()
|
||||
|
||||
node := builder.ForNode("fake-node").Result()
|
||||
|
||||
daemonSet := &appsv1api.DaemonSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "velero",
|
||||
Name: "node-agent",
|
||||
},
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "DaemonSet",
|
||||
APIVersion: appsv1api.SchemeGroupVersion.String(),
|
||||
},
|
||||
Spec: appsv1api.DaemonSetSpec{
|
||||
Template: corev1api.PodTemplateSpec{
|
||||
Spec: corev1api.PodSpec{
|
||||
Containers: []corev1api.Container{
|
||||
{
|
||||
Name: "node-agent",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
snapshotClientObj []runtime.Object
|
||||
kubeClientObj []runtime.Object
|
||||
ownerBackup *velerov1.Backup
|
||||
exposeParam PodVolumeExposeParam
|
||||
funcGetPodVolumeHostPath func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error)
|
||||
funcExtractPodVolumeHostPath func(context.Context, string, kubernetes.Interface, string, string) (string, error)
|
||||
err string
|
||||
}{
|
||||
{
|
||||
name: "get client pod fail",
|
||||
ownerBackup: backup,
|
||||
exposeParam: PodVolumeExposeParam{
|
||||
ClientNamespace: "fake-ns",
|
||||
ClientPodName: "fake-client-pod",
|
||||
},
|
||||
err: "error getting client pod fake-client-pod: pods \"fake-client-pod\" not found",
|
||||
},
|
||||
{
|
||||
name: "client pod with no node name",
|
||||
ownerBackup: backup,
|
||||
exposeParam: PodVolumeExposeParam{
|
||||
ClientNamespace: "fake-ns",
|
||||
ClientPodName: "fake-client-pod",
|
||||
},
|
||||
kubeClientObj: []runtime.Object{
|
||||
podWithNoNode,
|
||||
},
|
||||
err: "client pod fake-client-pod doesn't have a node name",
|
||||
},
|
||||
{
|
||||
name: "get node os fail",
|
||||
ownerBackup: backup,
|
||||
exposeParam: PodVolumeExposeParam{
|
||||
ClientNamespace: "fake-ns",
|
||||
ClientPodName: "fake-client-pod",
|
||||
},
|
||||
kubeClientObj: []runtime.Object{
|
||||
podWithNode,
|
||||
},
|
||||
err: "error getting OS for node fake-node: error getting node fake-node: nodes \"fake-node\" not found",
|
||||
},
|
||||
{
|
||||
name: "get pod volume path fail",
|
||||
ownerBackup: backup,
|
||||
exposeParam: PodVolumeExposeParam{
|
||||
ClientNamespace: "fake-ns",
|
||||
ClientPodName: "fake-client-pod",
|
||||
ClientPodVolume: "fake-client-volume",
|
||||
},
|
||||
kubeClientObj: []runtime.Object{
|
||||
podWithNode,
|
||||
node,
|
||||
},
|
||||
funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) {
|
||||
return datapath.AccessPoint{}, errors.New("fake-get-pod-volume-path-error")
|
||||
},
|
||||
err: "error to get pod volume path: fake-get-pod-volume-path-error",
|
||||
},
|
||||
{
|
||||
name: "extract pod volume path fail",
|
||||
ownerBackup: backup,
|
||||
exposeParam: PodVolumeExposeParam{
|
||||
ClientNamespace: "fake-ns",
|
||||
ClientPodName: "fake-client-pod",
|
||||
ClientPodVolume: "fake-client-volume",
|
||||
},
|
||||
kubeClientObj: []runtime.Object{
|
||||
podWithNode,
|
||||
node,
|
||||
},
|
||||
funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) {
|
||||
return datapath.AccessPoint{
|
||||
ByPath: "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount",
|
||||
}, nil
|
||||
},
|
||||
funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) {
|
||||
return "", errors.New("fake-extract-error")
|
||||
},
|
||||
err: "error to extract pod volume path: fake-extract-error",
|
||||
},
|
||||
{
|
||||
name: "create hosting pod fail",
|
||||
ownerBackup: backup,
|
||||
exposeParam: PodVolumeExposeParam{
|
||||
ClientNamespace: "fake-ns",
|
||||
ClientPodName: "fake-client-pod",
|
||||
ClientPodVolume: "fake-client-volume",
|
||||
},
|
||||
kubeClientObj: []runtime.Object{
|
||||
podWithNode,
|
||||
node,
|
||||
},
|
||||
funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) {
|
||||
return datapath.AccessPoint{
|
||||
ByPath: "/host_pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount",
|
||||
}, nil
|
||||
},
|
||||
funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) {
|
||||
return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil
|
||||
},
|
||||
err: "error to create hosting pod: error to get inherited pod info from node-agent: error to get node-agent pod template: error to get node-agent daemonset: daemonsets.apps \"node-agent\" not found",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
ownerBackup: backup,
|
||||
exposeParam: PodVolumeExposeParam{
|
||||
ClientNamespace: "fake-ns",
|
||||
ClientPodName: "fake-client-pod",
|
||||
ClientPodVolume: "fake-client-volume",
|
||||
},
|
||||
kubeClientObj: []runtime.Object{
|
||||
podWithNode,
|
||||
node,
|
||||
daemonSet,
|
||||
},
|
||||
funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) {
|
||||
return datapath.AccessPoint{
|
||||
ByPath: "/host_pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount",
|
||||
}, nil
|
||||
},
|
||||
funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) {
|
||||
return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
|
||||
|
||||
exposer := podVolumeExposer{
|
||||
kubeClient: fakeKubeClient,
|
||||
log: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
var ownerObject corev1api.ObjectReference
|
||||
if test.ownerBackup != nil {
|
||||
ownerObject = corev1api.ObjectReference{
|
||||
Kind: test.ownerBackup.Kind,
|
||||
Namespace: test.ownerBackup.Namespace,
|
||||
Name: test.ownerBackup.Name,
|
||||
UID: test.ownerBackup.UID,
|
||||
APIVersion: test.ownerBackup.APIVersion,
|
||||
}
|
||||
}
|
||||
|
||||
if test.funcGetPodVolumeHostPath != nil {
|
||||
getPodVolumeHostPath = test.funcGetPodVolumeHostPath
|
||||
}
|
||||
|
||||
if test.funcExtractPodVolumeHostPath != nil {
|
||||
extractPodVolumeHostPath = test.funcExtractPodVolumeHostPath
|
||||
}
|
||||
|
||||
err := exposer.Expose(context.Background(), ownerObject, test.exposeParam)
|
||||
if err == nil {
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = exposer.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(context.Background(), ownerObject.Name, metav1.GetOptions{})
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
assert.EqualError(t, err, test.err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPodVolumeExpose(t *testing.T) {
|
||||
backup := &velerov1.Backup{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: velerov1.SchemeGroupVersion.String(),
|
||||
Kind: "Backup",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: velerov1.DefaultNamespace,
|
||||
Name: "fake-backup",
|
||||
UID: "fake-uid",
|
||||
},
|
||||
}
|
||||
|
||||
backupPodNotRunning := builder.ForPod(backup.Namespace, backup.Name).Result()
|
||||
backupPodRunning := builder.ForPod(backup.Namespace, backup.Name).Phase(corev1api.PodRunning).Result()
|
||||
|
||||
scheme := runtime.NewScheme()
|
||||
corev1api.AddToScheme(scheme)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
kubeClientObj []runtime.Object
|
||||
ownerBackup *velerov1.Backup
|
||||
nodeName string
|
||||
Timeout time.Duration
|
||||
err string
|
||||
expectedResult *ExposeResult
|
||||
}{
|
||||
{
|
||||
name: "backup pod is not found",
|
||||
ownerBackup: backup,
|
||||
nodeName: "fake-node",
|
||||
},
|
||||
{
|
||||
name: "wait backup pod running fail",
|
||||
ownerBackup: backup,
|
||||
nodeName: "fake-node",
|
||||
kubeClientObj: []runtime.Object{
|
||||
backupPodNotRunning,
|
||||
},
|
||||
Timeout: time.Second,
|
||||
err: "error to wait for rediness of pod fake-backup: context deadline exceeded",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
ownerBackup: backup,
|
||||
nodeName: "fake-node",
|
||||
kubeClientObj: []runtime.Object{
|
||||
backupPodRunning,
|
||||
},
|
||||
Timeout: time.Second,
|
||||
expectedResult: &ExposeResult{
|
||||
ByPod: ExposeByPod{
|
||||
HostingPod: backupPodRunning,
|
||||
VolumeName: string(backup.UID),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
|
||||
|
||||
fakeClientBuilder := clientFake.NewClientBuilder()
|
||||
fakeClientBuilder = fakeClientBuilder.WithScheme(scheme)
|
||||
|
||||
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
|
||||
|
||||
exposer := podVolumeExposer{
|
||||
kubeClient: fakeKubeClient,
|
||||
log: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
var ownerObject corev1api.ObjectReference
|
||||
if test.ownerBackup != nil {
|
||||
ownerObject = corev1api.ObjectReference{
|
||||
Kind: test.ownerBackup.Kind,
|
||||
Namespace: test.ownerBackup.Namespace,
|
||||
Name: test.ownerBackup.Name,
|
||||
UID: test.ownerBackup.UID,
|
||||
APIVersion: test.ownerBackup.APIVersion,
|
||||
}
|
||||
}
|
||||
|
||||
result, err := exposer.GetExposed(context.Background(), ownerObject, fakeClient, test.nodeName, test.Timeout)
|
||||
if test.err == "" {
|
||||
assert.NoError(t, err)
|
||||
|
||||
if test.expectedResult == nil {
|
||||
assert.Nil(t, result)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, test.expectedResult.ByPod.VolumeName, result.ByPod.VolumeName)
|
||||
assert.Equal(t, test.expectedResult.ByPod.HostingPod.Name, result.ByPod.HostingPod.Name)
|
||||
}
|
||||
} else {
|
||||
assert.EqualError(t, err, test.err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPodVolumePeekExpose(t *testing.T) {
|
||||
backup := &velerov1.Backup{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: velerov1.SchemeGroupVersion.String(),
|
||||
Kind: "Backup",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: velerov1.DefaultNamespace,
|
||||
Name: "fake-backup",
|
||||
UID: "fake-uid",
|
||||
},
|
||||
}
|
||||
|
||||
backupPodUrecoverable := &corev1api.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: backup.Namespace,
|
||||
Name: backup.Name,
|
||||
},
|
||||
Status: corev1api.PodStatus{
|
||||
Phase: corev1api.PodFailed,
|
||||
},
|
||||
}
|
||||
|
||||
backupPod := &corev1api.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: backup.Namespace,
|
||||
Name: backup.Name,
|
||||
},
|
||||
}
|
||||
|
||||
scheme := runtime.NewScheme()
|
||||
corev1api.AddToScheme(scheme)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
kubeClientObj []runtime.Object
|
||||
ownerBackup *velerov1.Backup
|
||||
err string
|
||||
}{
|
||||
{
|
||||
name: "backup pod is not found",
|
||||
ownerBackup: backup,
|
||||
},
|
||||
{
|
||||
name: "pod is unrecoverable",
|
||||
ownerBackup: backup,
|
||||
kubeClientObj: []runtime.Object{
|
||||
backupPodUrecoverable,
|
||||
},
|
||||
err: "Pod is in abnormal state [Failed], message []",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
ownerBackup: backup,
|
||||
kubeClientObj: []runtime.Object{
|
||||
backupPod,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
|
||||
|
||||
exposer := podVolumeExposer{
|
||||
kubeClient: fakeKubeClient,
|
||||
log: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
var ownerObject corev1api.ObjectReference
|
||||
if test.ownerBackup != nil {
|
||||
ownerObject = corev1api.ObjectReference{
|
||||
Kind: test.ownerBackup.Kind,
|
||||
Namespace: test.ownerBackup.Namespace,
|
||||
Name: test.ownerBackup.Name,
|
||||
UID: test.ownerBackup.UID,
|
||||
APIVersion: test.ownerBackup.APIVersion,
|
||||
}
|
||||
}
|
||||
|
||||
err := exposer.PeekExposed(context.Background(), ownerObject)
|
||||
if test.err == "" {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
assert.EqualError(t, err, test.err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPodVolumeDiagnoseExpose(t *testing.T) {
|
||||
backup := &velerov1.Backup{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: velerov1.SchemeGroupVersion.String(),
|
||||
Kind: "Backup",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: velerov1.DefaultNamespace,
|
||||
Name: "fake-backup",
|
||||
UID: "fake-uid",
|
||||
},
|
||||
}
|
||||
|
||||
backupPodWithoutNodeName := corev1api.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: velerov1.DefaultNamespace,
|
||||
Name: "fake-backup",
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: backup.APIVersion,
|
||||
Kind: backup.Kind,
|
||||
Name: backup.Name,
|
||||
UID: backup.UID,
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: corev1api.PodStatus{
|
||||
Phase: corev1api.PodPending,
|
||||
Conditions: []corev1api.PodCondition{
|
||||
{
|
||||
Type: corev1api.PodInitialized,
|
||||
Status: corev1api.ConditionTrue,
|
||||
Message: "fake-pod-message",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
backupPodWithNodeName := corev1api.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: velerov1.DefaultNamespace,
|
||||
Name: "fake-backup",
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: backup.APIVersion,
|
||||
Kind: backup.Kind,
|
||||
Name: backup.Name,
|
||||
UID: backup.UID,
|
||||
},
|
||||
},
|
||||
},
|
||||
Spec: corev1api.PodSpec{
|
||||
NodeName: "fake-node",
|
||||
},
|
||||
Status: corev1api.PodStatus{
|
||||
Phase: corev1api.PodPending,
|
||||
Conditions: []corev1api.PodCondition{
|
||||
{
|
||||
Type: corev1api.PodInitialized,
|
||||
Status: corev1api.ConditionTrue,
|
||||
Message: "fake-pod-message",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
nodeAgentPod := corev1api.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: velerov1.DefaultNamespace,
|
||||
Name: "node-agent-pod-1",
|
||||
Labels: map[string]string{"role": "node-agent"},
|
||||
},
|
||||
Spec: corev1api.PodSpec{
|
||||
NodeName: "fake-node",
|
||||
},
|
||||
Status: corev1api.PodStatus{
|
||||
Phase: corev1api.PodRunning,
|
||||
},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
ownerBackup *velerov1.Backup
|
||||
kubeClientObj []runtime.Object
|
||||
snapshotClientObj []runtime.Object
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "no pod",
|
||||
ownerBackup: backup,
|
||||
expected: `begin diagnose pod volume exposer
|
||||
error getting hosting pod fake-backup, err: pods "fake-backup" not found
|
||||
end diagnose pod volume exposer`,
|
||||
},
|
||||
{
|
||||
name: "pod without node name, pvc without volume name, vs without status",
|
||||
ownerBackup: backup,
|
||||
kubeClientObj: []runtime.Object{
|
||||
&backupPodWithoutNodeName,
|
||||
},
|
||||
expected: `begin diagnose pod volume exposer
|
||||
Pod velero/fake-backup, phase Pending, node name
|
||||
Pod condition Initialized, status True, reason , message fake-pod-message
|
||||
end diagnose pod volume exposer`,
|
||||
},
|
||||
{
|
||||
name: "pod without node name",
|
||||
ownerBackup: backup,
|
||||
kubeClientObj: []runtime.Object{
|
||||
&backupPodWithoutNodeName,
|
||||
},
|
||||
expected: `begin diagnose pod volume exposer
|
||||
Pod velero/fake-backup, phase Pending, node name
|
||||
Pod condition Initialized, status True, reason , message fake-pod-message
|
||||
end diagnose pod volume exposer`,
|
||||
},
|
||||
{
|
||||
name: "pod with node name, no node agent",
|
||||
ownerBackup: backup,
|
||||
kubeClientObj: []runtime.Object{
|
||||
&backupPodWithNodeName,
|
||||
},
|
||||
expected: `begin diagnose pod volume exposer
|
||||
Pod velero/fake-backup, phase Pending, node name fake-node
|
||||
Pod condition Initialized, status True, reason , message fake-pod-message
|
||||
node-agent is not running in node fake-node, err: daemonset pod not found in running state in node fake-node
|
||||
end diagnose pod volume exposer`,
|
||||
},
|
||||
{
|
||||
name: "pod with node name, node agent is running",
|
||||
ownerBackup: backup,
|
||||
kubeClientObj: []runtime.Object{
|
||||
&backupPodWithNodeName,
|
||||
&nodeAgentPod,
|
||||
},
|
||||
expected: `begin diagnose pod volume exposer
|
||||
Pod velero/fake-backup, phase Pending, node name fake-node
|
||||
Pod condition Initialized, status True, reason , message fake-pod-message
|
||||
end diagnose pod volume exposer`,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
fakeKubeClient := fake.NewSimpleClientset(tt.kubeClientObj...)
|
||||
e := &podVolumeExposer{
|
||||
kubeClient: fakeKubeClient,
|
||||
log: velerotest.NewLogger(),
|
||||
}
|
||||
var ownerObject corev1api.ObjectReference
|
||||
if tt.ownerBackup != nil {
|
||||
ownerObject = corev1api.ObjectReference{
|
||||
Kind: tt.ownerBackup.Kind,
|
||||
Namespace: tt.ownerBackup.Namespace,
|
||||
Name: tt.ownerBackup.Name,
|
||||
UID: tt.ownerBackup.UID,
|
||||
APIVersion: tt.ownerBackup.APIVersion,
|
||||
}
|
||||
}
|
||||
|
||||
diag := e.DiagnoseExpose(context.Background(), ownerObject)
|
||||
assert.Equal(t, tt.expected, diag)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -25,6 +25,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/velero"
|
||||
"github.com/vmware-tanzu/velero/pkg/nodeagent"
|
||||
)
|
||||
|
||||
func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1api.DaemonSet {
|
||||
|
@ -126,8 +127,8 @@ func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1api.DaemonSet
|
|||
},
|
||||
VolumeMounts: []corev1api.VolumeMount{
|
||||
{
|
||||
Name: "host-pods",
|
||||
MountPath: "/host_pods",
|
||||
Name: nodeagent.HostPodVolumeMount,
|
||||
MountPath: nodeagent.HostPodVolumeMountPath(),
|
||||
MountPropagation: &mountPropagationMode,
|
||||
},
|
||||
{
|
||||
|
|
|
@ -41,6 +41,12 @@ const (
|
|||
|
||||
// nodeAgentRole marks pods with node-agent role on all nodes.
|
||||
nodeAgentRole = "node-agent"
|
||||
|
||||
// HostPodVolumeMount is the name of the volume in node-agent for host-pod mount
|
||||
HostPodVolumeMount = "host-pods"
|
||||
|
||||
// HostPodVolumeMountPoint is the mount point of the volume in node-agent for host-pod mount
|
||||
HostPodVolumeMountPoint = "host_pods"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -249,3 +255,45 @@ func GetAnnotationValue(ctx context.Context, kubeClient kubernetes.Interface, na
|
|||
|
||||
return val, nil
|
||||
}
|
||||
|
||||
func GetHostPodPath(ctx context.Context, kubeClient kubernetes.Interface, namespace string, osType string) (string, error) {
|
||||
dsName := daemonSet
|
||||
if osType == kube.NodeOSWindows {
|
||||
dsName = daemonsetWindows
|
||||
}
|
||||
|
||||
ds, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, dsName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "error getting daemonset %s", dsName)
|
||||
}
|
||||
|
||||
var volume *corev1api.Volume
|
||||
for _, v := range ds.Spec.Template.Spec.Volumes {
|
||||
if v.Name == HostPodVolumeMount {
|
||||
volume = &v
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if volume == nil {
|
||||
return "", errors.New("host pod volume is not found")
|
||||
}
|
||||
|
||||
if volume.HostPath == nil {
|
||||
return "", errors.New("host pod volume is not a host path volume")
|
||||
}
|
||||
|
||||
if volume.HostPath.Path == "" {
|
||||
return "", errors.New("host pod volume path is empty")
|
||||
}
|
||||
|
||||
return volume.HostPath.Path, nil
|
||||
}
|
||||
|
||||
func HostPodVolumeMountPath() string {
|
||||
return "/" + HostPodVolumeMountPoint
|
||||
}
|
||||
|
||||
func HostPodVolumeMountPathWin() string {
|
||||
return "\\" + HostPodVolumeMountPoint
|
||||
}
|
||||
|
|
|
@ -590,3 +590,164 @@ func TestGetAnnotationValue(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetHostPodPath(t *testing.T) {
|
||||
daemonSet := &appsv1api.DaemonSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "fake-ns",
|
||||
Name: "node-agent",
|
||||
},
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "DaemonSet",
|
||||
},
|
||||
}
|
||||
|
||||
daemonSetWithHostPodVolume := &appsv1api.DaemonSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "fake-ns",
|
||||
Name: "node-agent",
|
||||
},
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "DaemonSet",
|
||||
},
|
||||
Spec: appsv1api.DaemonSetSpec{
|
||||
Template: corev1api.PodTemplateSpec{
|
||||
Spec: corev1api.PodSpec{
|
||||
Volumes: []corev1api.Volume{
|
||||
{
|
||||
Name: HostPodVolumeMount,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
daemonSetWithHostPodVolumeAndEmptyPath := &appsv1api.DaemonSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "fake-ns",
|
||||
Name: "node-agent",
|
||||
},
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "DaemonSet",
|
||||
},
|
||||
Spec: appsv1api.DaemonSetSpec{
|
||||
Template: corev1api.PodTemplateSpec{
|
||||
Spec: corev1api.PodSpec{
|
||||
Volumes: []corev1api.Volume{
|
||||
{
|
||||
Name: HostPodVolumeMount,
|
||||
VolumeSource: corev1api.VolumeSource{
|
||||
HostPath: &corev1api.HostPathVolumeSource{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
daemonSetWithHostPodVolumeAndValidPath := &appsv1api.DaemonSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "fake-ns",
|
||||
Name: "node-agent",
|
||||
},
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "DaemonSet",
|
||||
},
|
||||
Spec: appsv1api.DaemonSetSpec{
|
||||
Template: corev1api.PodTemplateSpec{
|
||||
Spec: corev1api.PodSpec{
|
||||
Volumes: []corev1api.Volume{
|
||||
{
|
||||
Name: HostPodVolumeMount,
|
||||
VolumeSource: corev1api.VolumeSource{
|
||||
HostPath: &corev1api.HostPathVolumeSource{
|
||||
Path: "/var/lib/kubelet/pods",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
kubeClientObj []runtime.Object
|
||||
namespace string
|
||||
osType string
|
||||
expectedValue string
|
||||
expectErr string
|
||||
}{
|
||||
{
|
||||
name: "ds get error",
|
||||
namespace: "fake-ns",
|
||||
osType: kube.NodeOSWindows,
|
||||
kubeClientObj: []runtime.Object{
|
||||
daemonSet,
|
||||
},
|
||||
expectErr: "error getting daemonset node-agent-windows: daemonsets.apps \"node-agent-windows\" not found",
|
||||
},
|
||||
{
|
||||
name: "no host pod volume",
|
||||
namespace: "fake-ns",
|
||||
osType: kube.NodeOSLinux,
|
||||
kubeClientObj: []runtime.Object{
|
||||
daemonSet,
|
||||
},
|
||||
expectErr: "host pod volume is not found",
|
||||
},
|
||||
{
|
||||
name: "no host pod volume path",
|
||||
namespace: "fake-ns",
|
||||
osType: kube.NodeOSLinux,
|
||||
kubeClientObj: []runtime.Object{
|
||||
daemonSetWithHostPodVolume,
|
||||
},
|
||||
expectErr: "host pod volume is not a host path volume",
|
||||
},
|
||||
{
|
||||
name: "empty host pod volume path",
|
||||
namespace: "fake-ns",
|
||||
osType: kube.NodeOSLinux,
|
||||
kubeClientObj: []runtime.Object{
|
||||
daemonSetWithHostPodVolumeAndEmptyPath,
|
||||
},
|
||||
expectErr: "host pod volume path is empty",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
namespace: "fake-ns",
|
||||
osType: kube.NodeOSLinux,
|
||||
kubeClientObj: []runtime.Object{
|
||||
daemonSetWithHostPodVolumeAndValidPath,
|
||||
},
|
||||
expectedValue: "/var/lib/kubelet/pods",
|
||||
},
|
||||
{
|
||||
name: "succeed on empty os type",
|
||||
namespace: "fake-ns",
|
||||
kubeClientObj: []runtime.Object{
|
||||
daemonSetWithHostPodVolumeAndValidPath,
|
||||
},
|
||||
expectedValue: "/var/lib/kubelet/pods",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
|
||||
|
||||
path, err := GetHostPodPath(context.TODO(), fakeKubeClient, test.namespace, test.osType)
|
||||
|
||||
if test.expectErr == "" {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, test.expectedValue, path)
|
||||
} else {
|
||||
assert.EqualError(t, err, test.expectErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
storagev1api "k8s.io/api/storage/v1"
|
||||
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
|
@ -33,8 +32,8 @@ import (
|
|||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/label"
|
||||
|
@ -148,8 +147,8 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core
|
|||
// GetVolumeDirectory gets the name of the directory on the host, under /var/lib/kubelet/pods/<podUID>/volumes/,
|
||||
// where the specified volume lives.
|
||||
// For volumes with a CSIVolumeSource, append "/mount" to the directory name.
|
||||
func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (string, error) {
|
||||
pvc, pv, volume, err := GetPodPVCVolume(ctx, log, pod, volumeName, cli)
|
||||
func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, kubeClient kubernetes.Interface) (string, error) {
|
||||
pvc, pv, volume, err := GetPodPVCVolume(ctx, log, pod, volumeName, kubeClient)
|
||||
if err != nil {
|
||||
// This case implies the administrator created the PV and attached it directly, without PVC.
|
||||
// Note that only one VolumeSource can be populated per Volume on a pod
|
||||
|
@ -164,7 +163,7 @@ func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1
|
|||
|
||||
// Most common case is that we have a PVC VolumeSource, and we need to check the PV it points to for a CSI source.
|
||||
// PV's been created with a CSI source.
|
||||
isProvisionedByCSI, err := isProvisionedByCSI(log, pv, cli)
|
||||
isProvisionedByCSI, err := isProvisionedByCSI(log, pv, kubeClient)
|
||||
if err != nil {
|
||||
return "", errors.WithStack(err)
|
||||
}
|
||||
|
@ -179,9 +178,9 @@ func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1
|
|||
}
|
||||
|
||||
// GetVolumeMode gets the uploader.PersistentVolumeMode of the volume.
|
||||
func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (
|
||||
func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, kubeClient kubernetes.Interface) (
|
||||
uploader.PersistentVolumeMode, error) {
|
||||
_, pv, _, err := GetPodPVCVolume(ctx, log, pod, volumeName, cli)
|
||||
_, pv, _, err := GetPodPVCVolume(ctx, log, pod, volumeName, kubeClient)
|
||||
|
||||
if err != nil {
|
||||
if err == ErrorPodVolumeIsNotPVC {
|
||||
|
@ -198,7 +197,7 @@ func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.P
|
|||
|
||||
// GetPodPVCVolume gets the PVC, PV and volume for a pod volume name.
|
||||
// Returns pod volume in case of ErrorPodVolumeIsNotPVC error
|
||||
func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (
|
||||
func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, kubeClient kubernetes.Interface) (
|
||||
*corev1api.PersistentVolumeClaim, *corev1api.PersistentVolume, *corev1api.Volume, error) {
|
||||
var volume *corev1api.Volume
|
||||
|
||||
|
@ -217,14 +216,12 @@ func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api
|
|||
return nil, nil, volume, ErrorPodVolumeIsNotPVC // There is a pod volume but it is not a PVC
|
||||
}
|
||||
|
||||
pvc := &corev1api.PersistentVolumeClaim{}
|
||||
err := cli.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: volume.VolumeSource.PersistentVolumeClaim.ClaimName}, pvc)
|
||||
pvc, err := kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(ctx, volume.VolumeSource.PersistentVolumeClaim.ClaimName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, nil, nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
pv := &corev1api.PersistentVolume{}
|
||||
err = cli.Get(ctx, client.ObjectKey{Name: pvc.Spec.VolumeName}, pv)
|
||||
pv, err := kubeClient.CoreV1().PersistentVolumes().Get(ctx, pvc.Spec.VolumeName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, nil, nil, errors.WithStack(err)
|
||||
}
|
||||
|
@ -235,7 +232,7 @@ func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api
|
|||
// isProvisionedByCSI function checks whether this is a CSI PV by annotation.
|
||||
// Either "pv.kubernetes.io/provisioned-by" or "pv.kubernetes.io/migrated-to" indicates
|
||||
// PV is provisioned by CSI.
|
||||
func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume, kbClient client.Client) (bool, error) {
|
||||
func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume, kubeClient kubernetes.Interface) (bool, error) {
|
||||
if pv.Spec.CSI != nil {
|
||||
return true, nil
|
||||
}
|
||||
|
@ -245,10 +242,11 @@ func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume,
|
|||
driverName := pv.Annotations[KubeAnnDynamicallyProvisioned]
|
||||
migratedDriver := pv.Annotations[KubeAnnMigratedTo]
|
||||
if len(driverName) > 0 || len(migratedDriver) > 0 {
|
||||
list := &storagev1api.CSIDriverList{}
|
||||
if err := kbClient.List(context.TODO(), list); err != nil {
|
||||
list, err := kubeClient.StorageV1().CSIDrivers().List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, driver := range list.Items {
|
||||
if driverName == driver.Name || migratedDriver == driver.Name {
|
||||
log.Debugf("the annotation %s or %s equals to %s indicates the volume is provisioned by a CSI driver", KubeAnnDynamicallyProvisioned, KubeAnnMigratedTo, driver.Name)
|
||||
|
|
|
@ -34,11 +34,12 @@ import (
|
|||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
)
|
||||
|
||||
func TestNamespaceAndName(t *testing.T) {
|
||||
|
@ -216,17 +217,18 @@ func TestGetVolumeDirectorySuccess(t *testing.T) {
|
|||
ObjectMeta: metav1.ObjectMeta{Name: "csi.test.com"},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
clientBuilder := fake.NewClientBuilder().WithLists(&storagev1api.CSIDriverList{Items: []storagev1api.CSIDriver{csiDriver}})
|
||||
|
||||
objs := []runtime.Object{&csiDriver}
|
||||
if tc.pvc != nil {
|
||||
clientBuilder = clientBuilder.WithObjects(tc.pvc)
|
||||
objs = append(objs, tc.pvc)
|
||||
}
|
||||
if tc.pv != nil {
|
||||
clientBuilder = clientBuilder.WithObjects(tc.pv)
|
||||
objs = append(objs, tc.pv)
|
||||
}
|
||||
|
||||
fakeKubeClient := fake.NewSimpleClientset(objs...)
|
||||
|
||||
// Function under test
|
||||
dir, err := GetVolumeDirectory(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, clientBuilder.Build())
|
||||
dir, err := GetVolumeDirectory(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, fakeKubeClient)
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.want, dir)
|
||||
|
@ -264,17 +266,18 @@ func TestGetVolumeModeSuccess(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
clientBuilder := fake.NewClientBuilder()
|
||||
|
||||
objs := []runtime.Object{}
|
||||
if tc.pvc != nil {
|
||||
clientBuilder = clientBuilder.WithObjects(tc.pvc)
|
||||
objs = append(objs, tc.pvc)
|
||||
}
|
||||
if tc.pv != nil {
|
||||
clientBuilder = clientBuilder.WithObjects(tc.pv)
|
||||
objs = append(objs, tc.pv)
|
||||
}
|
||||
|
||||
fakeKubeClient := fake.NewSimpleClientset(objs...)
|
||||
|
||||
// Function under test
|
||||
mode, err := GetVolumeMode(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, clientBuilder.Build())
|
||||
mode, err := GetVolumeMode(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, fakeKubeClient)
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.want, mode)
|
||||
|
|
Loading…
Reference in New Issue