/* Copyright The Velero Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package exposer import ( "context" "fmt" "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1api "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/util/boolptr" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" ) const ( PodVolumeExposeTypeBackup = "pod-volume-backup" PodVolumeExposeTypeRestore = "pod-volume-restore" ) // PodVolumeExposeParam define the input param for pod volume Expose type PodVolumeExposeParam struct { // ClientPodName is the name of pod to be backed up or restored ClientPodName string // ClientNamespace is the namespace to be backed up or restored ClientNamespace string // ClientNamespace is the pod volume for the client PVC ClientPodVolume string // HostingPodLabels is the labels that are going to apply to the hosting pod HostingPodLabels map[string]string // HostingPodAnnotations is the annotations that are going to apply to the hosting pod HostingPodAnnotations map[string]string // HostingPodTolerations is the tolerations that are going to apply to the hosting pod HostingPodTolerations []corev1api.Toleration // Resources defines the resource requirements of the hosting pod Resources corev1api.ResourceRequirements // OperationTimeout specifies the time wait for resources operations in Expose OperationTimeout time.Duration // Type specifies the type of the expose, either backup or erstore Type string // PriorityClassName is the priority class name for the data mover pod PriorityClassName string } // PodVolumeExposer is the interfaces for a pod volume exposer type PodVolumeExposer interface { // Expose starts the process to a pod volume expose, the expose process may take long time Expose(context.Context, corev1api.ObjectReference, PodVolumeExposeParam) error // GetExposed polls the status of the expose. // If the expose is accessible by the current caller, it waits the expose ready and returns the expose result. // Otherwise, it returns nil as the expose result without an error. GetExposed(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error) // PeekExposed tests the status of the expose. // If the expose is incomplete but not recoverable, it returns an error. // Otherwise, it returns nil immediately. PeekExposed(context.Context, corev1api.ObjectReference) error // DiagnoseExpose generate the diagnostic info when the expose is not finished for a long time. // If it finds any problem, it returns an string about the problem. DiagnoseExpose(context.Context, corev1api.ObjectReference) string // CleanUp cleans up any objects generated during the restore expose CleanUp(context.Context, corev1api.ObjectReference) } // NewPodVolumeExposer creates a new instance of pod volume exposer func NewPodVolumeExposer(kubeClient kubernetes.Interface, log logrus.FieldLogger) PodVolumeExposer { return &podVolumeExposer{ kubeClient: kubeClient, fs: filesystem.NewFileSystem(), log: log, } } type podVolumeExposer struct { kubeClient kubernetes.Interface fs filesystem.Interface log logrus.FieldLogger } var getPodVolumeHostPath = GetPodVolumeHostPath var extractPodVolumeHostPath = ExtractPodVolumeHostPath func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1api.ObjectReference, param PodVolumeExposeParam) error { curLog := e.log.WithFields(logrus.Fields{ "owner": ownerObject.Name, "client pod": param.ClientPodName, "client pod volume": param.ClientPodVolume, "client namespace": param.ClientNamespace, "type": param.Type, }) pod, err := e.kubeClient.CoreV1().Pods(param.ClientNamespace).Get(ctx, param.ClientPodName, metav1.GetOptions{}) if err != nil { return errors.Wrapf(err, "error getting client pod %s", param.ClientPodName) } if pod.Spec.NodeName == "" { return errors.Errorf("client pod %s doesn't have a node name", pod.Name) } nodeOS, err := kube.GetNodeOS(ctx, pod.Spec.NodeName, e.kubeClient.CoreV1()) if err != nil { return errors.Wrapf(err, "error getting OS for node %s", pod.Spec.NodeName) } curLog.Infof("Client pod is running in node %s, os %s", pod.Spec.NodeName, nodeOS) path, err := getPodVolumeHostPath(ctx, pod, param.ClientPodVolume, e.kubeClient, e.fs, e.log) if err != nil { return errors.Wrapf(err, "error to get pod volume path") } path.ByPath, err = extractPodVolumeHostPath(ctx, path.ByPath, e.kubeClient, ownerObject.Namespace, nodeOS) if err != nil { return errors.Wrapf(err, "error to extract pod volume path") } curLog.WithField("path", path).Infof("Host path is retrieved for pod %s, volume %s", param.ClientPodName, param.ClientPodVolume) hostingPod, err := e.createHostingPod(ctx, ownerObject, param.Type, path.ByPath, param.OperationTimeout, param.HostingPodLabels, param.HostingPodAnnotations, param.HostingPodTolerations, pod.Spec.NodeName, param.Resources, nodeOS, param.PriorityClassName) if err != nil { return errors.Wrapf(err, "error to create hosting pod") } curLog.WithField("pod name", hostingPod.Name).Info("Hosting pod is created") return nil } func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1api.ObjectReference, nodeClient client.Client, nodeName string, timeout time.Duration) (*ExposeResult, error) { hostingPodName := ownerObject.Name containerName := string(ownerObject.UID) volumeName := string(ownerObject.UID) curLog := e.log.WithFields(logrus.Fields{ "owner": ownerObject.Name, "node": nodeName, }) var updated *corev1api.Pod err := wait.PollUntilContextTimeout(ctx, 2*time.Second, timeout, true, func(ctx context.Context) (bool, error) { pod := &corev1api.Pod{} err := nodeClient.Get(ctx, types.NamespacedName{ Namespace: ownerObject.Namespace, Name: hostingPodName, }, pod) if err != nil { return false, errors.Wrapf(err, "error to get pod %s/%s", ownerObject.Namespace, hostingPodName) } if pod.Status.Phase != corev1api.PodRunning { return false, nil } updated = pod return true, nil }) if err != nil { if apierrors.IsNotFound(err) { curLog.WithField("hosting pod", hostingPodName).Debug("Hosting pod is not running in the current node") return nil, nil } else { return nil, errors.Wrapf(err, "error to wait for rediness of pod %s", hostingPodName) } } curLog.WithField("pod", updated.Name).Infof("Hosting pod is in running state in node %s", updated.Spec.NodeName) return &ExposeResult{ByPod: ExposeByPod{ HostingPod: updated, HostingContainer: containerName, VolumeName: volumeName, }}, nil } func (e *podVolumeExposer) PeekExposed(ctx context.Context, ownerObject corev1api.ObjectReference) error { hostingPodName := ownerObject.Name curLog := e.log.WithFields(logrus.Fields{ "owner": ownerObject.Name, }) pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, hostingPodName, metav1.GetOptions{}) if apierrors.IsNotFound(err) { return nil } if err != nil { curLog.WithError(err).Warnf("error to peek hosting pod %s", hostingPodName) return nil } if podFailed, message := kube.IsPodUnrecoverable(pod, curLog); podFailed { return errors.New(message) } return nil } func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev1api.ObjectReference) string { hostingPodName := ownerObject.Name diag := "begin diagnose pod volume exposer\n" pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, hostingPodName, metav1.GetOptions{}) if err != nil { pod = nil diag += fmt.Sprintf("error getting hosting pod %s, err: %v\n", hostingPodName, err) } if pod != nil { diag += kube.DiagnosePod(pod) if pod.Spec.NodeName != "" { if err := nodeagent.KbClientIsRunningInNode(ctx, ownerObject.Namespace, pod.Spec.NodeName, e.kubeClient); err != nil { diag += fmt.Sprintf("node-agent is not running in node %s, err: %v\n", pod.Spec.NodeName, err) } } } diag += "end diagnose pod volume exposer" return diag } func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) { restorePodName := ownerObject.Name kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log) } func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1api.ObjectReference, exposeType string, hostPath string, operationTimeout time.Duration, label map[string]string, annotation map[string]string, toleration []corev1api.Toleration, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string, priorityClassName string) (*corev1api.Pod, error) { hostingPodName := ownerObject.Name containerName := string(ownerObject.UID) clientVolumeName := string(ownerObject.UID) clientVolumePath := "/" + clientVolumeName podInfo, err := getInheritedPodInfo(ctx, e.kubeClient, ownerObject.Namespace, nodeOS) if err != nil { return nil, errors.Wrap(err, "error to get inherited pod info from node-agent") } // Log the priority class if it's set if priorityClassName != "" { e.log.Debugf("Setting priority class %q for data mover pod %s", priorityClassName, hostingPodName) } var gracePeriod int64 mountPropagation := corev1api.MountPropagationHostToContainer volumeMounts := []corev1api.VolumeMount{{ Name: clientVolumeName, MountPath: clientVolumePath, MountPropagation: &mountPropagation, }} volumeMounts = append(volumeMounts, podInfo.volumeMounts...) volumes := []corev1api.Volume{{ Name: clientVolumeName, VolumeSource: corev1api.VolumeSource{ HostPath: &corev1api.HostPathVolumeSource{ Path: hostPath, }, }, }} volumes = append(volumes, podInfo.volumes...) args := []string{ fmt.Sprintf("--volume-path=%s", clientVolumePath), fmt.Sprintf("--resource-timeout=%s", operationTimeout.String()), } command := []string{ "/velero", "pod-volume", } if exposeType == PodVolumeExposeTypeBackup { args = append(args, fmt.Sprintf("--pod-volume-backup=%s", ownerObject.Name)) command = append(command, "backup") } else { args = append(args, fmt.Sprintf("--pod-volume-restore=%s", ownerObject.Name)) command = append(command, "restore") } args = append(args, podInfo.logFormatArgs...) args = append(args, podInfo.logLevelArgs...) var securityCtx *corev1api.PodSecurityContext nodeSelector := map[string]string{} podOS := corev1api.PodOS{} if nodeOS == kube.NodeOSWindows { userID := "ContainerAdministrator" securityCtx = &corev1api.PodSecurityContext{ WindowsOptions: &corev1api.WindowsSecurityContextOptions{ RunAsUserName: &userID, }, } nodeSelector[kube.NodeOSLabel] = kube.NodeOSWindows podOS.Name = kube.NodeOSWindows toleration = append(toleration, []corev1api.Toleration{ { Key: "os", Operator: "Equal", Effect: "NoSchedule", Value: "windows", }, { Key: "os", Operator: "Equal", Effect: "NoExecute", Value: "windows", }, }...) } else { userID := int64(0) securityCtx = &corev1api.PodSecurityContext{ RunAsUser: &userID, } nodeSelector[kube.NodeOSLabel] = kube.NodeOSLinux podOS.Name = kube.NodeOSLinux } pod := &corev1api.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: hostingPodName, Namespace: ownerObject.Namespace, OwnerReferences: []metav1.OwnerReference{ { APIVersion: ownerObject.APIVersion, Kind: ownerObject.Kind, Name: ownerObject.Name, UID: ownerObject.UID, Controller: boolptr.True(), }, }, Labels: label, Annotations: annotation, }, Spec: corev1api.PodSpec{ NodeSelector: nodeSelector, OS: &podOS, Containers: []corev1api.Container{ { Name: containerName, Image: podInfo.image, ImagePullPolicy: corev1api.PullNever, Command: command, Args: args, VolumeMounts: volumeMounts, Env: podInfo.env, EnvFrom: podInfo.envFrom, Resources: resources, }, }, PriorityClassName: priorityClassName, ServiceAccountName: podInfo.serviceAccount, TerminationGracePeriodSeconds: &gracePeriod, Volumes: volumes, NodeName: selectedNode, RestartPolicy: corev1api.RestartPolicyNever, SecurityContext: securityCtx, Tolerations: toleration, DNSPolicy: podInfo.dnsPolicy, DNSConfig: podInfo.dnsConfig, ImagePullSecrets: podInfo.imagePullSecrets, }, } return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{}) }