velero/pkg/exposer/pod_volume.go

415 lines
13 KiB
Go

/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exposer
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
const (
PodVolumeExposeTypeBackup = "pod-volume-backup"
PodVolumeExposeTypeRestore = "pod-volume-restore"
)
// PodVolumeExposeParam define the input param for pod volume Expose
type PodVolumeExposeParam struct {
// ClientPodName is the name of pod to be backed up or restored
ClientPodName string
// ClientNamespace is the namespace to be backed up or restored
ClientNamespace string
// ClientNamespace is the pod volume for the client PVC
ClientPodVolume string
// HostingPodLabels is the labels that are going to apply to the hosting pod
HostingPodLabels map[string]string
// HostingPodAnnotations is the annotations that are going to apply to the hosting pod
HostingPodAnnotations map[string]string
// HostingPodTolerations is the tolerations that are going to apply to the hosting pod
HostingPodTolerations []corev1api.Toleration
// Resources defines the resource requirements of the hosting pod
Resources corev1api.ResourceRequirements
// OperationTimeout specifies the time wait for resources operations in Expose
OperationTimeout time.Duration
// Type specifies the type of the expose, either backup or erstore
Type string
// PriorityClassName is the priority class name for the data mover pod
PriorityClassName string
}
// PodVolumeExposer is the interfaces for a pod volume exposer
type PodVolumeExposer interface {
// Expose starts the process to a pod volume expose, the expose process may take long time
Expose(context.Context, corev1api.ObjectReference, PodVolumeExposeParam) error
// GetExposed polls the status of the expose.
// If the expose is accessible by the current caller, it waits the expose ready and returns the expose result.
// Otherwise, it returns nil as the expose result without an error.
GetExposed(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error)
// PeekExposed tests the status of the expose.
// If the expose is incomplete but not recoverable, it returns an error.
// Otherwise, it returns nil immediately.
PeekExposed(context.Context, corev1api.ObjectReference) error
// DiagnoseExpose generate the diagnostic info when the expose is not finished for a long time.
// If it finds any problem, it returns an string about the problem.
DiagnoseExpose(context.Context, corev1api.ObjectReference) string
// CleanUp cleans up any objects generated during the restore expose
CleanUp(context.Context, corev1api.ObjectReference)
}
// NewPodVolumeExposer creates a new instance of pod volume exposer
func NewPodVolumeExposer(kubeClient kubernetes.Interface, log logrus.FieldLogger) PodVolumeExposer {
return &podVolumeExposer{
kubeClient: kubeClient,
fs: filesystem.NewFileSystem(),
log: log,
}
}
type podVolumeExposer struct {
kubeClient kubernetes.Interface
fs filesystem.Interface
log logrus.FieldLogger
}
var getPodVolumeHostPath = GetPodVolumeHostPath
var extractPodVolumeHostPath = ExtractPodVolumeHostPath
func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1api.ObjectReference, param PodVolumeExposeParam) error {
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"client pod": param.ClientPodName,
"client pod volume": param.ClientPodVolume,
"client namespace": param.ClientNamespace,
"type": param.Type,
})
pod, err := e.kubeClient.CoreV1().Pods(param.ClientNamespace).Get(ctx, param.ClientPodName, metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "error getting client pod %s", param.ClientPodName)
}
if pod.Spec.NodeName == "" {
return errors.Errorf("client pod %s doesn't have a node name", pod.Name)
}
nodeOS, err := kube.GetNodeOS(ctx, pod.Spec.NodeName, e.kubeClient.CoreV1())
if err != nil {
return errors.Wrapf(err, "error getting OS for node %s", pod.Spec.NodeName)
}
curLog.Infof("Client pod is running in node %s, os %s", pod.Spec.NodeName, nodeOS)
path, err := getPodVolumeHostPath(ctx, pod, param.ClientPodVolume, e.kubeClient, e.fs, e.log)
if err != nil {
return errors.Wrapf(err, "error to get pod volume path")
}
path.ByPath, err = extractPodVolumeHostPath(ctx, path.ByPath, e.kubeClient, ownerObject.Namespace, nodeOS)
if err != nil {
return errors.Wrapf(err, "error to extract pod volume path")
}
curLog.WithField("path", path).Infof("Host path is retrieved for pod %s, volume %s", param.ClientPodName, param.ClientPodVolume)
hostingPod, err := e.createHostingPod(ctx, ownerObject, param.Type, path.ByPath, param.OperationTimeout, param.HostingPodLabels, param.HostingPodAnnotations, param.HostingPodTolerations, pod.Spec.NodeName, param.Resources, nodeOS, param.PriorityClassName)
if err != nil {
return errors.Wrapf(err, "error to create hosting pod")
}
curLog.WithField("pod name", hostingPod.Name).Info("Hosting pod is created")
return nil
}
func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1api.ObjectReference, nodeClient client.Client, nodeName string, timeout time.Duration) (*ExposeResult, error) {
hostingPodName := ownerObject.Name
containerName := string(ownerObject.UID)
volumeName := string(ownerObject.UID)
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"node": nodeName,
})
var updated *corev1api.Pod
err := wait.PollUntilContextTimeout(ctx, 2*time.Second, timeout, true, func(ctx context.Context) (bool, error) {
pod := &corev1api.Pod{}
err := nodeClient.Get(ctx, types.NamespacedName{
Namespace: ownerObject.Namespace,
Name: hostingPodName,
}, pod)
if err != nil {
return false, errors.Wrapf(err, "error to get pod %s/%s", ownerObject.Namespace, hostingPodName)
}
if pod.Status.Phase != corev1api.PodRunning {
return false, nil
}
updated = pod
return true, nil
})
if err != nil {
if apierrors.IsNotFound(err) {
curLog.WithField("hosting pod", hostingPodName).Debug("Hosting pod is not running in the current node")
return nil, nil
} else {
return nil, errors.Wrapf(err, "error to wait for rediness of pod %s", hostingPodName)
}
}
curLog.WithField("pod", updated.Name).Infof("Hosting pod is in running state in node %s", updated.Spec.NodeName)
return &ExposeResult{ByPod: ExposeByPod{
HostingPod: updated,
HostingContainer: containerName,
VolumeName: volumeName,
}}, nil
}
func (e *podVolumeExposer) PeekExposed(ctx context.Context, ownerObject corev1api.ObjectReference) error {
hostingPodName := ownerObject.Name
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
})
pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, hostingPodName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
curLog.WithError(err).Warnf("error to peek hosting pod %s", hostingPodName)
return nil
}
if podFailed, message := kube.IsPodUnrecoverable(pod, curLog); podFailed {
return errors.New(message)
}
return nil
}
func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev1api.ObjectReference) string {
hostingPodName := ownerObject.Name
diag := "begin diagnose pod volume exposer\n"
pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, hostingPodName, metav1.GetOptions{})
if err != nil {
pod = nil
diag += fmt.Sprintf("error getting hosting pod %s, err: %v\n", hostingPodName, err)
}
if pod != nil {
diag += kube.DiagnosePod(pod)
if pod.Spec.NodeName != "" {
if err := nodeagent.KbClientIsRunningInNode(ctx, ownerObject.Namespace, pod.Spec.NodeName, e.kubeClient); err != nil {
diag += fmt.Sprintf("node-agent is not running in node %s, err: %v\n", pod.Spec.NodeName, err)
}
}
}
diag += "end diagnose pod volume exposer"
return diag
}
func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) {
restorePodName := ownerObject.Name
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log)
}
func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1api.ObjectReference, exposeType string, hostPath string,
operationTimeout time.Duration, label map[string]string, annotation map[string]string, toleration []corev1api.Toleration, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string, priorityClassName string) (*corev1api.Pod, error) {
hostingPodName := ownerObject.Name
containerName := string(ownerObject.UID)
clientVolumeName := string(ownerObject.UID)
clientVolumePath := "/" + clientVolumeName
podInfo, err := getInheritedPodInfo(ctx, e.kubeClient, ownerObject.Namespace, nodeOS)
if err != nil {
return nil, errors.Wrap(err, "error to get inherited pod info from node-agent")
}
// Log the priority class if it's set
if priorityClassName != "" {
e.log.Debugf("Setting priority class %q for data mover pod %s", priorityClassName, hostingPodName)
}
var gracePeriod int64
mountPropagation := corev1api.MountPropagationHostToContainer
volumeMounts := []corev1api.VolumeMount{{
Name: clientVolumeName,
MountPath: clientVolumePath,
MountPropagation: &mountPropagation,
}}
volumeMounts = append(volumeMounts, podInfo.volumeMounts...)
volumes := []corev1api.Volume{{
Name: clientVolumeName,
VolumeSource: corev1api.VolumeSource{
HostPath: &corev1api.HostPathVolumeSource{
Path: hostPath,
},
},
}}
volumes = append(volumes, podInfo.volumes...)
args := []string{
fmt.Sprintf("--volume-path=%s", clientVolumePath),
fmt.Sprintf("--resource-timeout=%s", operationTimeout.String()),
}
command := []string{
"/velero",
"pod-volume",
}
if exposeType == PodVolumeExposeTypeBackup {
args = append(args, fmt.Sprintf("--pod-volume-backup=%s", ownerObject.Name))
command = append(command, "backup")
} else {
args = append(args, fmt.Sprintf("--pod-volume-restore=%s", ownerObject.Name))
command = append(command, "restore")
}
args = append(args, podInfo.logFormatArgs...)
args = append(args, podInfo.logLevelArgs...)
var securityCtx *corev1api.PodSecurityContext
nodeSelector := map[string]string{}
podOS := corev1api.PodOS{}
if nodeOS == kube.NodeOSWindows {
userID := "ContainerAdministrator"
securityCtx = &corev1api.PodSecurityContext{
WindowsOptions: &corev1api.WindowsSecurityContextOptions{
RunAsUserName: &userID,
},
}
nodeSelector[kube.NodeOSLabel] = kube.NodeOSWindows
podOS.Name = kube.NodeOSWindows
toleration = append(toleration, []corev1api.Toleration{
{
Key: "os",
Operator: "Equal",
Effect: "NoSchedule",
Value: "windows",
},
{
Key: "os",
Operator: "Equal",
Effect: "NoExecute",
Value: "windows",
},
}...)
} else {
userID := int64(0)
securityCtx = &corev1api.PodSecurityContext{
RunAsUser: &userID,
}
nodeSelector[kube.NodeOSLabel] = kube.NodeOSLinux
podOS.Name = kube.NodeOSLinux
}
pod := &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: hostingPodName,
Namespace: ownerObject.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: ownerObject.APIVersion,
Kind: ownerObject.Kind,
Name: ownerObject.Name,
UID: ownerObject.UID,
Controller: boolptr.True(),
},
},
Labels: label,
Annotations: annotation,
},
Spec: corev1api.PodSpec{
NodeSelector: nodeSelector,
OS: &podOS,
Containers: []corev1api.Container{
{
Name: containerName,
Image: podInfo.image,
ImagePullPolicy: corev1api.PullNever,
Command: command,
Args: args,
VolumeMounts: volumeMounts,
Env: podInfo.env,
EnvFrom: podInfo.envFrom,
Resources: resources,
},
},
PriorityClassName: priorityClassName,
ServiceAccountName: podInfo.serviceAccount,
TerminationGracePeriodSeconds: &gracePeriod,
Volumes: volumes,
NodeName: selectedNode,
RestartPolicy: corev1api.RestartPolicyNever,
SecurityContext: securityCtx,
Tolerations: toleration,
DNSPolicy: podInfo.dnsPolicy,
DNSConfig: podInfo.dnsConfig,
ImagePullSecrets: podInfo.imagePullSecrets,
},
}
return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}