Add support for block volumes (#6680)

Signed-off-by: David Zaninovic <dzaninovic@catalogicsoftware.com>
pull/6750/head
David Zaninovic 2023-09-28 09:44:46 -04:00 committed by GitHub
parent a22f28e876
commit 8e01d1b9be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 616 additions and 193 deletions

View File

@ -0,0 +1 @@
Add support for block volumes with Kopia

View File

@ -49,6 +49,9 @@ spec:
- mountPath: /host_pods
mountPropagation: HostToContainer
name: host-pods
- mountPath: /var/lib/kubelet/plugins
mountPropagation: HostToContainer
name: host-plugins
- mountPath: /scratch
name: scratch
- mountPath: /credentials
@ -60,6 +63,9 @@ spec:
- hostPath:
path: /var/lib/kubelet/pods
name: host-pods
- hostPath:
path: /var/lib/kubelet/plugins
name: host-plugins
- emptyDir: {}
name: scratch
- name: cloud-credentials

View File

@ -703,33 +703,38 @@ type Provider interface {
In this case, we will extend the default kopia uploader to add the ability, when a given volume is for a block mode and is mapped as a device, we will use the [StreamingFile](https://pkg.go.dev/github.com/kopia/kopia@v0.13.0/fs#StreamingFile) to stream the device and backup to the kopia repository.
```go
func getLocalBlockEntry(kopiaEntry fs.Entry, log logrus.FieldLogger) (fs.Entry, error) {
path := kopiaEntry.LocalFilesystemPath()
fileInfo, err := os.Lstat(path)
func getLocalBlockEntry(sourcePath string) (fs.Entry, error) {
source, err := resolveSymlink(sourcePath)
if err != nil {
return nil, errors.Wrapf(err, "Unable to get the source device information %s", path)
return nil, errors.Wrap(err, "resolveSymlink")
}
fileInfo, err := os.Lstat(source)
if err != nil {
return nil, errors.Wrapf(err, "unable to get the source device information %s", source)
}
if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
return nil, errors.Errorf("Source path %s is not a block device", path)
return nil, errors.Errorf("source path %s is not a block device", source)
}
device, err := os.Open(path)
device, err := os.Open(source)
if err != nil {
if os.IsPermission(err) || err.Error() == ErrNotPermitted {
return nil, errors.Wrapf(err, "No permission to open the source device %s, make sure that node agent is running in privileged mode", path)
return nil, errors.Wrapf(err, "no permission to open the source device %s, make sure that node agent is running in privileged mode", source)
}
return nil, errors.Wrapf(err, "Unable to open the source device %s", path)
return nil, errors.Wrapf(err, "unable to open the source device %s", source)
}
return virtualfs.StreamingFileFromReader(kopiaEntry.Name(), device), nil
sf := virtualfs.StreamingFileFromReader(source, device)
return virtualfs.NewStaticDirectory(source, []fs.Entry{sf}), nil
}
```
In the `pkg/uploader/kopia/snapshot.go` this is used in the Backup call like
```go
if volMode == PersistentVolumeFilesystem {
if volMode == uploader.PersistentVolumeFilesystem {
// to be consistent with restic when backup empty dir returns one error for upper logic handle
dirs, err := os.ReadDir(source)
if err != nil {
@ -742,15 +747,17 @@ In the `pkg/uploader/kopia/snapshot.go` this is used in the Backup call like
source = filepath.Clean(source)
...
sourceEntry, err := getLocalFSEntry(source)
if err != nil {
return nil, false, errors.Wrap(err, "Unable to get local filesystem entry")
}
var sourceEntry fs.Entry
if volMode == PersistentVolumeBlock {
sourceEntry, err = getLocalBlockEntry(sourceEntry, log)
if volMode == uploader.PersistentVolumeBlock {
sourceEntry, err = getLocalBlockEntry(source)
if err != nil {
return nil, false, errors.Wrap(err, "Unable to get local block device entry")
return nil, false, errors.Wrap(err, "unable to get local block device entry")
}
} else {
sourceEntry, err = getLocalFSEntry(source)
if err != nil {
return nil, false, errors.Wrap(err, "unable to get local filesystem entry")
}
}
@ -766,6 +773,8 @@ We only need to extend two functions the rest will be passed through.
```go
type BlockOutput struct {
*restore.FilesystemOutput
targetFileName string
}
var _ restore.Output = &BlockOutput{}
@ -773,30 +782,15 @@ var _ restore.Output = &BlockOutput{}
const bufferSize = 128 * 1024
func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remoteFile fs.File) error {
targetFileName, err := filepath.EvalSymlinks(o.TargetPath)
if err != nil {
return errors.Wrapf(err, "Unable to evaluate symlinks for %s", targetFileName)
}
fileInfo, err := os.Lstat(targetFileName)
if err != nil {
return errors.Wrapf(err, "Unable to get the target device information for %s", targetFileName)
}
if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
return errors.Errorf("Target file %s is not a block device", targetFileName)
}
remoteReader, err := remoteFile.Open(ctx)
if err != nil {
return errors.Wrapf(err, "Failed to open remote file %s", remoteFile.Name())
return errors.Wrapf(err, "failed to open remote file %s", remoteFile.Name())
}
defer remoteReader.Close()
targetFile, err := os.Create(targetFileName)
targetFile, err := os.Create(o.targetFileName)
if err != nil {
return errors.Wrapf(err, "Failed to open file %s", targetFileName)
return errors.Wrapf(err, "failed to open file %s", o.targetFileName)
}
defer targetFile.Close()
@ -807,7 +801,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
bytesToWrite, err := remoteReader.Read(buffer)
if err != nil {
if err != io.EOF {
return errors.Wrapf(err, "Failed to read data from remote file %s", targetFileName)
return errors.Wrapf(err, "failed to read data from remote file %s", o.targetFileName)
}
readData = false
}
@ -819,7 +813,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
bytesToWrite -= bytesWritten
offset += bytesWritten
} else {
return errors.Wrapf(err, "Failed to write data to file %s", targetFileName)
return errors.Wrapf(err, "failed to write data to file %s", o.targetFileName)
}
}
}
@ -829,42 +823,43 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
}
func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error {
targetFileName, err := filepath.EvalSymlinks(o.TargetPath)
var err error
o.targetFileName, err = filepath.EvalSymlinks(o.TargetPath)
if err != nil {
return errors.Wrapf(err, "Unable to evaluate symlinks for %s", targetFileName)
return errors.Wrapf(err, "unable to evaluate symlinks for %s", o.targetFileName)
}
fileInfo, err := os.Lstat(targetFileName)
fileInfo, err := os.Lstat(o.targetFileName)
if err != nil {
return errors.Wrapf(err, "Unable to get the target device information for %s", o.TargetPath)
return errors.Wrapf(err, "unable to get the target device information for %s", o.TargetPath)
}
if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
return errors.Errorf("Target file %s is not a block device", o.TargetPath)
return errors.Errorf("target file %s is not a block device", o.TargetPath)
}
return nil
}
```
Of note, we do need to add root access to the daemon set node agent to access the new mount.
Additional mount is required in the node-agent specification to resolve symlinks to the block devices from /host_pods/POD_ID/volumeDevices/kubernetes.io~csi directory.
```yaml
...
- mountPath: /var/lib/kubelet/plugins
mountPropagation: HostToContainer
name: host-plugins
....
- hostPath:
path: /var/lib/kubelet/plugins
name: host-plugins
```
...
Privileged mode is required to access the block devices in /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish directory as confirmed by testing on EKS and Minikube.
```yaml
SecurityContext: &corev1.SecurityContext{
Privileged: &c.privilegedAgent,
Privileged: &c.privilegedNodeAgent,
},
```
## Plugin Data Movers

View File

@ -95,6 +95,12 @@ func (b *PersistentVolumeBuilder) StorageClass(name string) *PersistentVolumeBui
return b
}
// VolumeMode sets the PersistentVolume's volume mode.
func (b *PersistentVolumeBuilder) VolumeMode(volMode corev1api.PersistentVolumeMode) *PersistentVolumeBuilder {
b.object.Spec.VolumeMode = &volMode
return b
}
// NodeAffinityRequired sets the PersistentVolume's NodeAffinity Requirement.
func (b *PersistentVolumeBuilder) NodeAffinityRequired(req *corev1api.NodeSelector) *PersistentVolumeBuilder {
b.object.Spec.NodeAffinity = &corev1api.VolumeNodeAffinity{

View File

@ -66,6 +66,7 @@ type Options struct {
BackupStorageConfig flag.Map
VolumeSnapshotConfig flag.Map
UseNodeAgent bool
PrivilegedNodeAgent bool
//TODO remove UseRestic when migration test out of using it
UseRestic bool
Wait bool
@ -110,6 +111,7 @@ func (o *Options) BindFlags(flags *pflag.FlagSet) {
flags.BoolVar(&o.RestoreOnly, "restore-only", o.RestoreOnly, "Run the server in restore-only mode. Optional.")
flags.BoolVar(&o.DryRun, "dry-run", o.DryRun, "Generate resources, but don't send them to the cluster. Use with -o. Optional.")
flags.BoolVar(&o.UseNodeAgent, "use-node-agent", o.UseNodeAgent, "Create Velero node-agent daemonset. Optional. Velero node-agent hosts Velero modules that need to run in one or more nodes(i.e. Restic, Kopia).")
flags.BoolVar(&o.PrivilegedNodeAgent, "privileged-node-agent", o.PrivilegedNodeAgent, "Use privileged mode for the node agent. Optional. Required to backup block devices.")
flags.BoolVar(&o.Wait, "wait", o.Wait, "Wait for Velero deployment to be ready. Optional.")
flags.DurationVar(&o.DefaultRepoMaintenanceFrequency, "default-repo-maintain-frequency", o.DefaultRepoMaintenanceFrequency, "How often 'maintain' is run for backup repositories by default. Optional.")
flags.DurationVar(&o.GarbageCollectionFrequency, "garbage-collection-frequency", o.GarbageCollectionFrequency, "How often the garbage collection runs for expired backups.(default 1h)")
@ -198,6 +200,7 @@ func (o *Options) AsVeleroOptions() (*install.VeleroOptions, error) {
SecretData: secretData,
RestoreOnly: o.RestoreOnly,
UseNodeAgent: o.UseNodeAgent,
PrivilegedNodeAgent: o.PrivilegedNodeAgent,
UseVolumeSnapshots: o.UseVolumeSnapshots,
BSLConfig: o.BackupStorageConfig.Data(),
VSLConfig: o.VolumeSnapshotConfig.Data(),

View File

@ -177,7 +177,10 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}
exposeParam := r.setupExposeParam(du)
exposeParam, err := r.setupExposeParam(du)
if err != nil {
return r.errorOut(ctx, du, err, "failed to set exposer parameters", log)
}
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
@ -735,18 +738,33 @@ func (r *DataUploadReconciler) closeDataPath(ctx context.Context, duName string)
r.dataPathMgr.RemoveAsyncBR(duName)
}
func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) interface{} {
func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) (interface{}, error) {
if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
pvc := &corev1.PersistentVolumeClaim{}
err := r.client.Get(context.Background(), types.NamespacedName{
Namespace: du.Spec.SourceNamespace,
Name: du.Spec.SourcePVC,
}, pvc)
if err != nil {
return nil, errors.Wrapf(err, "failed to get PVC %s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC)
}
accessMode := exposer.AccessModeFileSystem
if pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == corev1.PersistentVolumeBlock {
accessMode = exposer.AccessModeBlock
}
return &exposer.CSISnapshotExposeParam{
SnapshotName: du.Spec.CSISnapshot.VolumeSnapshot,
SourceNamespace: du.Spec.SourceNamespace,
StorageClass: du.Spec.CSISnapshot.StorageClass,
HostingPodLabels: map[string]string{velerov1api.DataUploadLabel: du.Name},
AccessMode: exposer.AccessModeFileSystem,
AccessMode: accessMode,
Timeout: du.Spec.OperationTimeout.Duration,
}
}, nil
}
return nil
return nil, nil
}
func (r *DataUploadReconciler) setupWaitExposePara(du *velerov2alpha1api.DataUpload) interface{} {

View File

@ -306,6 +306,7 @@ func TestReconcile(t *testing.T) {
name string
du *velerov2alpha1api.DataUpload
pod *corev1.Pod
pvc *corev1.PersistentVolumeClaim
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
dataMgr *datapath.Manager
expectedProcessed bool
@ -345,11 +346,21 @@ func TestReconcile(t *testing.T) {
}, {
name: "Dataupload should be accepted",
du: dataUploadBuilder().Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(),
pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(),
expectedProcessed: false,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
expectedRequeue: ctrl.Result{},
},
{
name: "Dataupload should fail to get PVC information",
du: dataUploadBuilder().Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "wrong-pvc"}).Result(),
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
expectedRequeue: ctrl.Result{},
expectedErrMsg: "failed to get PVC",
},
{
name: "Dataupload should be prepared",
du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(),
@ -448,6 +459,11 @@ func TestReconcile(t *testing.T) {
require.NoError(t, err)
}
if test.pvc != nil {
err = r.client.Create(ctx, test.pvc)
require.NoError(t, err)
}
if test.dataMgr != nil {
r.dataPathMgr = test.dataMgr
} else {

View File

@ -133,10 +133,10 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, paren
if !fs.initialized {
return errors.New("file system data path is not initialized")
}
volMode := getPersistentVolumeMode(source)
go func() {
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, volMode, fs)
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull,
parentSnapshot, source.VolMode, fs)
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
@ -155,10 +155,8 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
return errors.New("file system data path is not initialized")
}
volMode := getPersistentVolumeMode(target)
go func() {
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, volMode, fs)
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, fs)
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
@ -172,13 +170,6 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
return nil
}
func getPersistentVolumeMode(source AccessPoint) uploader.PersistentVolumeMode {
if source.ByBlock != "" {
return uploader.PersistentVolumeBlock
}
return uploader.PersistentVolumeFilesystem
}
// UpdateProgress which implement ProgressUpdater interface to update progress status
func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) {
if fs.callbacks.OnProgress != nil {

View File

@ -53,7 +53,7 @@ type Callbacks struct {
// AccessPoint represents an access point that has been exposed to a data path instance
type AccessPoint struct {
ByPath string
ByBlock string
VolMode uploader.PersistentVolumeMode
}
// AsyncBR is the interface for asynchronous data path methods

View File

@ -233,9 +233,12 @@ func (e *csiSnapshotExposer) CleanUp(ctx context.Context, ownerObject corev1.Obj
}
func getVolumeModeByAccessMode(accessMode string) (corev1.PersistentVolumeMode, error) {
if accessMode == AccessModeFileSystem {
switch accessMode {
case AccessModeFileSystem:
return corev1.PersistentVolumeFilesystem, nil
} else {
case AccessModeBlock:
return corev1.PersistentVolumeBlock, nil
default:
return "", errors.Errorf("unsupported access mode %s", accessMode)
}
}
@ -356,6 +359,7 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co
}
var gracePeriod int64 = 0
volumeMounts, volumeDevices := kube.MakePodPVCAttachment(volumeName, backupPVC.Spec.VolumeMode)
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -379,10 +383,8 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co
Image: podInfo.image,
ImagePullPolicy: corev1.PullNever,
Command: []string{"/velero-helper", "pause"},
VolumeMounts: []corev1.VolumeMount{{
Name: volumeName,
MountPath: "/" + volumeName,
}},
VolumeMounts: volumeMounts,
VolumeDevices: volumeDevices,
},
},
ServiceAccountName: podInfo.serviceAccount,

View File

@ -82,7 +82,7 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.O
return errors.Errorf("Target PVC %s/%s has already been bound, abort", sourceNamespace, targetPVCName)
}
restorePod, err := e.createRestorePod(ctx, ownerObject, hostingPodLabels, selectedNode)
restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, hostingPodLabels, selectedNode)
if err != nil {
return errors.Wrapf(err, "error to create restore pod")
}
@ -247,7 +247,8 @@ func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject co
return nil
}
func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, label map[string]string, selectedNode string) (*corev1.Pod, error) {
func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, targetPVC *corev1.PersistentVolumeClaim,
label map[string]string, selectedNode string) (*corev1.Pod, error) {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name
@ -260,6 +261,7 @@ func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObjec
}
var gracePeriod int64 = 0
volumeMounts, volumeDevices := kube.MakePodPVCAttachment(volumeName, targetPVC.Spec.VolumeMode)
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -283,10 +285,8 @@ func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObjec
Image: podInfo.image,
ImagePullPolicy: corev1.PullNever,
Command: []string{"/velero-helper", "pause"},
VolumeMounts: []corev1.VolumeMount{{
Name: volumeName,
MountPath: "/" + volumeName,
}},
VolumeMounts: volumeMounts,
VolumeDevices: volumeDevices,
},
},
ServiceAccountName: podInfo.serviceAccount,

View File

@ -26,11 +26,13 @@ import (
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
var getVolumeDirectory = kube.GetVolumeDirectory
var getVolumeMode = kube.GetVolumeMode
var singlePathMatch = kube.SinglePathMatch
// GetPodVolumeHostPath returns a path that can be accessed from the host for a given volume of a pod
@ -45,7 +47,17 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1.Pod, volumeName strin
logger.WithField("volDir", volDir).Info("Got volume dir")
pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(pod.GetUID()), volDir)
volMode, err := getVolumeMode(ctx, logger, pod, volumeName, cli)
if err != nil {
return datapath.AccessPoint{}, errors.Wrapf(err, "error getting volume mode for volume %s in pod %s", volumeName, pod.Name)
}
volSubDir := "volumes"
if volMode == uploader.PersistentVolumeBlock {
volSubDir = "volumeDevices"
}
pathGlob := fmt.Sprintf("/host_pods/%s/%s/*/%s", string(pod.GetUID()), volSubDir, volDir)
logger.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob")
path, err := singlePathMatch(pathGlob, fs, logger)
@ -56,6 +68,7 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1.Pod, volumeName strin
logger.WithField("path", path).Info("Found path matching glob")
return datapath.AccessPoint{
ByPath: path,
ByPath: path,
VolMode: volMode,
}, nil
}

View File

@ -29,17 +29,19 @@ import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
func TestGetPodVolumeHostPath(t *testing.T) {
tests := []struct {
name string
getVolumeDirFunc func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (string, error)
pathMatchFunc func(string, filesystem.Interface, logrus.FieldLogger) (string, error)
pod *corev1.Pod
pvc string
err string
name string
getVolumeDirFunc func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (string, error)
getVolumeModeFunc func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (uploader.PersistentVolumeMode, error)
pathMatchFunc func(string, filesystem.Interface, logrus.FieldLogger) (string, error)
pod *corev1.Pod
pvc string
err string
}{
{
name: "get volume dir fail",
@ -55,6 +57,9 @@ func TestGetPodVolumeHostPath(t *testing.T) {
getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (string, error) {
return "", nil
},
getVolumeModeFunc: func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (uploader.PersistentVolumeMode, error) {
return uploader.PersistentVolumeFilesystem, nil
},
pathMatchFunc: func(string, filesystem.Interface, logrus.FieldLogger) (string, error) {
return "", errors.New("fake-error-2")
},
@ -62,6 +67,18 @@ func TestGetPodVolumeHostPath(t *testing.T) {
pvc: "fake-pvc-1",
err: "error identifying unique volume path on host for volume fake-pvc-1 in pod fake-pod-2: fake-error-2",
},
{
name: "get block volume dir success",
getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (
string, error) {
return "fake-pvc-1", nil
},
pathMatchFunc: func(string, filesystem.Interface, logrus.FieldLogger) (string, error) {
return "/host_pods/fake-pod-1-id/volumeDevices/kubernetes.io~csi/fake-pvc-1-id", nil
},
pod: builder.ForPod(velerov1api.DefaultNamespace, "fake-pod-1").Result(),
pvc: "fake-pvc-1",
},
}
for _, test := range tests {
@ -70,12 +87,18 @@ func TestGetPodVolumeHostPath(t *testing.T) {
getVolumeDirectory = test.getVolumeDirFunc
}
if test.getVolumeModeFunc != nil {
getVolumeMode = test.getVolumeModeFunc
}
if test.pathMatchFunc != nil {
singlePathMatch = test.pathMatchFunc
}
_, err := GetPodVolumeHostPath(context.Background(), test.pod, test.pvc, nil, nil, velerotest.NewLogger())
assert.EqualError(t, err, test.err)
if test.err != "" || err != nil {
assert.EqualError(t, err, test.err)
}
})
}
}

View File

@ -22,6 +22,7 @@ import (
const (
AccessModeFileSystem = "by-file-system"
AccessModeBlock = "by-block-device"
)
// ExposeResult defines the result of expose.

View File

@ -86,6 +86,14 @@ func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1.DaemonSet {
},
},
},
{
Name: "host-plugins",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/var/lib/kubelet/plugins",
},
},
},
{
Name: "scratch",
VolumeSource: corev1.VolumeSource{
@ -102,13 +110,20 @@ func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1.DaemonSet {
"/velero",
},
Args: daemonSetArgs,
SecurityContext: &corev1.SecurityContext{
Privileged: &c.privilegedNodeAgent,
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "host-pods",
MountPath: "/host_pods",
MountPropagation: &mountPropagationMode,
},
{
Name: "host-plugins",
MountPath: "/var/lib/kubelet/plugins",
MountPropagation: &mountPropagationMode,
},
{
Name: "scratch",
MountPath: "/scratch",

View File

@ -35,7 +35,7 @@ func TestDaemonSet(t *testing.T) {
ds = DaemonSet("velero", WithSecret(true))
assert.Equal(t, 7, len(ds.Spec.Template.Spec.Containers[0].Env))
assert.Equal(t, 3, len(ds.Spec.Template.Spec.Volumes))
assert.Equal(t, 4, len(ds.Spec.Template.Spec.Volumes))
ds = DaemonSet("velero", WithFeatures([]string{"foo,bar,baz"}))
assert.Len(t, ds.Spec.Template.Spec.Containers[0].Args, 3)

View File

@ -47,6 +47,7 @@ type podTemplateConfig struct {
serviceAccountName string
uploaderType string
defaultSnapshotMoveData bool
privilegedNodeAgent bool
}
func WithImage(image string) podTemplateOption {
@ -149,6 +150,12 @@ func WithServiceAccountName(sa string) podTemplateOption {
}
}
func WithPrivilegedNodeAgent() podTemplateOption {
return func(c *podTemplateConfig) {
c.privilegedNodeAgent = true
}
}
func Deployment(namespace string, opts ...podTemplateOption) *appsv1.Deployment {
// TODO: Add support for server args
c := &podTemplateConfig{

View File

@ -240,6 +240,7 @@ type VeleroOptions struct {
SecretData []byte
RestoreOnly bool
UseNodeAgent bool
PrivilegedNodeAgent bool
UseVolumeSnapshots bool
BSLConfig map[string]string
VSLConfig map[string]string
@ -374,6 +375,9 @@ func AllResources(o *VeleroOptions) *unstructured.UnstructuredList {
if len(o.Features) > 0 {
dsOpts = append(dsOpts, WithFeatures(o.Features))
}
if o.PrivilegedNodeAgent {
dsOpts = append(dsOpts, WithPrivilegedNodeAgent())
}
ds := DaemonSet(o.Namespace, dsOpts...)
if err := appendUnstructured(resources, ds); err != nil {
fmt.Printf("error appending DaemonSet %s: %s\n", ds.GetName(), err.Error())

View File

@ -200,10 +200,11 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
b.resultsLock.Unlock()
var (
errs []error
podVolumeBackups []*velerov1api.PodVolumeBackup
podVolumes = make(map[string]corev1api.Volume)
mountedPodVolumes = sets.String{}
errs []error
podVolumeBackups []*velerov1api.PodVolumeBackup
podVolumes = make(map[string]corev1api.Volume)
mountedPodVolumes = sets.String{}
attachedPodDevices = sets.String{}
)
pvcSummary := NewPVCBackupSummary()
@ -233,6 +234,9 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
for _, volumeMount := range container.VolumeMounts {
mountedPodVolumes.Insert(volumeMount.Name)
}
for _, volumeDevice := range container.VolumeDevices {
attachedPodDevices.Insert(volumeDevice.Name)
}
}
repoIdentifier := ""
@ -268,6 +272,15 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
continue
}
// check if volume is a block volume
if attachedPodDevices.Has(volumeName) {
msg := fmt.Sprintf("volume %s declared in pod %s/%s is a block volume. Block volumes are not supported for fs backup, skipping",
volumeName, pod.Namespace, pod.Name)
log.Warn(msg)
pvcSummary.addSkipped(volumeName, msg)
continue
}
// volumes that are not mounted by any container should not be backed up, because
// its directory is not created
if !mountedPodVolumes.Has(volumeName) {

View File

@ -0,0 +1,55 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kopia
import (
"os"
"syscall"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/virtualfs"
"github.com/pkg/errors"
)
const ErrNotPermitted = "operation not permitted"
func getLocalBlockEntry(sourcePath string) (fs.Entry, error) {
source, err := resolveSymlink(sourcePath)
if err != nil {
return nil, errors.Wrap(err, "resolveSymlink")
}
fileInfo, err := os.Lstat(source)
if err != nil {
return nil, errors.Wrapf(err, "unable to get the source device information %s", source)
}
if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
return nil, errors.Errorf("source path %s is not a block device", source)
}
device, err := os.Open(source)
if err != nil {
if os.IsPermission(err) || err.Error() == ErrNotPermitted {
return nil, errors.Wrapf(err, "no permission to open the source device %s, make sure that node agent is running in privileged mode", source)
}
return nil, errors.Wrapf(err, "unable to open the source device %s", source)
}
sf := virtualfs.StreamingFileFromReader(source, device)
return virtualfs.NewStaticDirectory(source, []fs.Entry{sf}), nil
}

View File

@ -0,0 +1,99 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kopia
import (
"context"
"io"
"os"
"path/filepath"
"syscall"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/snapshot/restore"
"github.com/pkg/errors"
)
type BlockOutput struct {
*restore.FilesystemOutput
targetFileName string
}
var _ restore.Output = &BlockOutput{}
const bufferSize = 128 * 1024
func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remoteFile fs.File) error {
remoteReader, err := remoteFile.Open(ctx)
if err != nil {
return errors.Wrapf(err, "failed to open remote file %s", remoteFile.Name())
}
defer remoteReader.Close()
targetFile, err := os.Create(o.targetFileName)
if err != nil {
return errors.Wrapf(err, "failed to open file %s", o.targetFileName)
}
defer targetFile.Close()
buffer := make([]byte, bufferSize)
readData := true
for readData {
bytesToWrite, err := remoteReader.Read(buffer)
if err != nil {
if err != io.EOF {
return errors.Wrapf(err, "failed to read data from remote file %s", o.targetFileName)
}
readData = false
}
if bytesToWrite > 0 {
offset := 0
for bytesToWrite > 0 {
if bytesWritten, err := targetFile.Write(buffer[offset:bytesToWrite]); err == nil {
bytesToWrite -= bytesWritten
offset += bytesWritten
} else {
return errors.Wrapf(err, "failed to write data to file %s", o.targetFileName)
}
}
}
}
return nil
}
func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error {
var err error
o.targetFileName, err = filepath.EvalSymlinks(o.TargetPath)
if err != nil {
return errors.Wrapf(err, "unable to evaluate symlinks for %s", o.targetFileName)
}
fileInfo, err := os.Lstat(o.targetFileName)
if err != nil {
return errors.Wrapf(err, "unable to get the target device information for %s", o.TargetPath)
}
if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
return errors.Errorf("target file %s is not a block device", o.TargetPath)
}
return nil
}

View File

@ -131,25 +131,22 @@ func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.Re
if fsUploader == nil {
return nil, false, errors.New("get empty kopia uploader")
}
if volMode == uploader.PersistentVolumeBlock {
return nil, false, errors.New("unable to handle block storage")
}
dir, err := filepath.Abs(sourcePath)
source, err := filepath.Abs(sourcePath)
if err != nil {
return nil, false, errors.Wrapf(err, "Invalid source path '%s'", sourcePath)
}
// to be consistent with restic when backup empty dir returns one error for upper logic handle
dirs, err := os.ReadDir(dir)
if err != nil {
return nil, false, errors.Wrapf(err, "Unable to read dir in path %s", dir)
} else if len(dirs) == 0 {
return nil, true, nil
if volMode == uploader.PersistentVolumeFilesystem {
// to be consistent with restic when backup empty dir returns one error for upper logic handle
dirs, err := os.ReadDir(source)
if err != nil {
return nil, false, errors.Wrapf(err, "Unable to read dir in path %s", source)
} else if len(dirs) == 0 {
return nil, true, nil
}
}
dir = filepath.Clean(dir)
source = filepath.Clean(source)
sourceInfo := snapshot.SourceInfo{
UserName: udmrepo.GetRepoUser(),
@ -157,16 +154,25 @@ func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.Re
Path: filepath.Clean(realSource),
}
if realSource == "" {
sourceInfo.Path = dir
sourceInfo.Path = source
}
rootDir, err := getLocalFSEntry(dir)
if err != nil {
return nil, false, errors.Wrap(err, "Unable to get local filesystem entry")
var sourceEntry fs.Entry
if volMode == uploader.PersistentVolumeBlock {
sourceEntry, err = getLocalBlockEntry(source)
if err != nil {
return nil, false, errors.Wrap(err, "unable to get local block device entry")
}
} else {
sourceEntry, err = getLocalFSEntry(source)
if err != nil {
return nil, false, errors.Wrap(err, "unable to get local filesystem entry")
}
}
kopiaCtx := kopia.SetupKopiaLog(ctx, log)
snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, rootDir, forceFull, parentSnapshot, tags, log, "Kopia Uploader")
snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, log, "Kopia Uploader")
if err != nil {
return nil, false, err
}
@ -348,7 +354,8 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour
}
// Restore restore specific sourcePath with given snapshotID and update progress
func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode,
log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
log.Info("Start to restore...")
kopiaCtx := kopia.SetupKopiaLog(ctx, log)
@ -370,7 +377,7 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
return 0, 0, errors.Wrapf(err, "Unable to resolve path %v", dest)
}
output := &restore.FilesystemOutput{
fsOutput := &restore.FilesystemOutput{
TargetPath: path,
OverwriteDirectories: true,
OverwriteFiles: true,
@ -378,11 +385,18 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
IgnorePermissionErrors: true,
}
err = output.Init(ctx)
err = fsOutput.Init(ctx)
if err != nil {
return 0, 0, errors.Wrap(err, "error to init output")
}
var output restore.Output = fsOutput
if volMode == uploader.PersistentVolumeBlock {
output = &BlockOutput{
FilesystemOutput: fsOutput,
}
}
stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{
Parallel: runtime.NumCPU(),
RestoreDirEntryAtDepth: math.MaxInt32,

View File

@ -23,6 +23,7 @@ import (
"time"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/virtualfs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
@ -594,11 +595,11 @@ func TestBackup(t *testing.T) {
expectedError: errors.New("Unable to read dir"),
},
{
name: "Unable to handle block mode",
name: "Source path is not a block device",
sourcePath: "/",
tags: nil,
volMode: uploader.PersistentVolumeBlock,
expectedError: errors.New("unable to handle block storage"),
expectedError: errors.New("source path / is not a block device"),
},
}
@ -660,6 +661,7 @@ func TestRestore(t *testing.T) {
expectedBytes int64
expectedCount int32
expectedError error
volMode uploader.PersistentVolumeMode
}
// Define test cases
@ -697,6 +699,46 @@ func TestRestore(t *testing.T) {
snapshotID: "snapshot-123",
expectedError: nil,
},
{
name: "Expect block volume successful",
filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) {
return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil
},
restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) {
return restore.Stats{}, nil
},
snapshotID: "snapshot-123",
expectedError: nil,
volMode: uploader.PersistentVolumeBlock,
},
{
name: "Unable to evaluate symlinks for block volume",
filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) {
return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil
},
restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) {
err := output.BeginDirectory(ctx, "fake-dir", virtualfs.NewStaticDirectory("fake-dir", nil))
return restore.Stats{}, err
},
snapshotID: "snapshot-123",
expectedError: errors.New("unable to evaluate symlinks for"),
volMode: uploader.PersistentVolumeBlock,
dest: "/wrong-dest",
},
{
name: "Target file is not a block device",
filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) {
return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil
},
restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) {
err := output.BeginDirectory(ctx, "fake-dir", virtualfs.NewStaticDirectory("fake-dir", nil))
return restore.Stats{}, err
},
snapshotID: "snapshot-123",
expectedError: errors.New("target file /tmp is not a block device"),
volMode: uploader.PersistentVolumeBlock,
dest: "/tmp",
},
}
em := &manifest.EntryMetadata{
@ -706,6 +748,10 @@ func TestRestore(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.volMode == "" {
tc.volMode = uploader.PersistentVolumeFilesystem
}
if tc.invalidManifestType {
em.Labels[manifest.TypeLabelKey] = ""
} else {
@ -725,7 +771,7 @@ func TestRestore(t *testing.T) {
repoWriterMock.On("OpenObject", mock.Anything, mock.Anything).Return(em, nil)
progress := new(Progress)
bytesRestored, fileCount, err := Restore(context.Background(), repoWriterMock, progress, tc.snapshotID, tc.dest, logrus.New(), nil)
bytesRestored, fileCount, err := Restore(context.Background(), repoWriterMock, progress, tc.snapshotID, tc.dest, tc.volMode, logrus.New(), nil)
// Check if the returned error matches the expected error
if tc.expectedError != nil {

View File

@ -128,11 +128,6 @@ func (kp *kopiaProvider) RunBackup(
return "", false, errors.New("path is empty")
}
// For now, error on block mode
if volMode == uploader.PersistentVolumeBlock {
return "", false, errors.New("unable to currently support block mode")
}
log := kp.log.WithFields(logrus.Fields{
"path": path,
"realSource": realSource,
@ -214,10 +209,6 @@ func (kp *kopiaProvider) RunRestore(
"volumePath": volumePath,
})
if volMode == uploader.PersistentVolumeBlock {
return errors.New("unable to currently support block mode")
}
repoWriter := kopia.NewShimRepo(kp.bkRepo)
progress := new(kopia.Progress)
progress.InitThrottle(restoreProgressCheckInterval)
@ -235,7 +226,7 @@ func (kp *kopiaProvider) RunRestore(
// We use the cancel channel to control the restore cancel, so don't pass a context with cancel to Kopia restore.
// Otherwise, Kopia restore will not response to the cancel control but return an arbitrary error.
// Kopia restore cancel is not designed as well as Kopia backup which uses the context to control backup cancel all the way.
size, fileCount, err := RestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, log, restoreCancel)
size, fileCount, err := RestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, volMode, log, restoreCancel)
if err != nil {
return errors.Wrapf(err, "Failed to run kopia restore")

View File

@ -94,12 +94,12 @@ func TestRunBackup(t *testing.T) {
notError: false,
},
{
name: "error on vol mode",
name: "success to backup block mode volume",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return nil, true, nil
return &uploader.SnapshotInfo{}, false, nil
},
volMode: uploader.PersistentVolumeBlock,
notError: false,
notError: true,
},
}
for _, tc := range testCases {
@ -125,31 +125,31 @@ func TestRunRestore(t *testing.T) {
testCases := []struct {
name string
hookRestoreFunc func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error)
hookRestoreFunc func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error)
notError bool
volMode uploader.PersistentVolumeMode
}{
{
name: "normal restore",
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
return 0, 0, nil
},
notError: true,
},
{
name: "failed to restore",
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
return 0, 0, errors.New("failed to restore")
},
notError: false,
},
{
name: "failed to restore block mode",
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
return 0, 0, errors.New("failed to restore")
name: "normal block mode restore",
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
return 0, 0, nil
},
volMode: uploader.PersistentVolumeBlock,
notError: false,
notError: true,
},
}

View File

@ -212,6 +212,9 @@ func TestResticRunRestore(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.volMode == "" {
tc.volMode = uploader.PersistentVolumeFilesystem
}
resticRestoreCMDFunc = tc.hookResticRestoreFunc
if tc.volMode == "" {
tc.volMode = uploader.PersistentVolumeFilesystem

View File

@ -316,3 +316,23 @@ func WaitPVBound(ctx context.Context, pvGetter corev1client.CoreV1Interface, pvN
func IsPVCBound(pvc *corev1api.PersistentVolumeClaim) bool {
return pvc.Spec.VolumeName != ""
}
// MakePodPVCAttachment returns the volume mounts and devices for a pod needed to attach a PVC
func MakePodPVCAttachment(volumeName string, volumeMode *corev1api.PersistentVolumeMode) ([]corev1api.VolumeMount, []corev1api.VolumeDevice) {
var volumeMounts []corev1api.VolumeMount = nil
var volumeDevices []corev1api.VolumeDevice = nil
if volumeMode != nil && *volumeMode == corev1api.PersistentVolumeBlock {
volumeDevices = []corev1api.VolumeDevice{{
Name: volumeName,
DevicePath: "/" + volumeName,
}}
} else {
volumeMounts = []corev1api.VolumeMount{{
Name: volumeName,
MountPath: "/" + volumeName,
}}
}
return volumeMounts, volumeDevices
}

View File

@ -35,6 +35,7 @@ import (
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
@ -50,6 +51,8 @@ const (
KubeAnnSelectedNode = "volume.kubernetes.io/selected-node"
)
var ErrorPodVolumeIsNotPVC = errors.New("pod volume is not a PVC")
// NamespaceAndName returns a string in the format <namespace>/<name>
func NamespaceAndName(objMeta metav1.Object) string {
if objMeta.GetNamespace() == "" {
@ -122,6 +125,57 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core
// where the specified volume lives.
// For volumes with a CSIVolumeSource, append "/mount" to the directory name.
func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (string, error) {
pvc, pv, volume, err := GetPodPVCVolume(ctx, log, pod, volumeName, cli)
if err != nil {
// This case implies the administrator created the PV and attached it directly, without PVC.
// Note that only one VolumeSource can be populated per Volume on a pod
if err == ErrorPodVolumeIsNotPVC {
if volume.VolumeSource.CSI != nil {
return volume.Name + "/mount", nil
}
return volume.Name, nil
}
return "", errors.WithStack(err)
}
// Most common case is that we have a PVC VolumeSource, and we need to check the PV it points to for a CSI source.
// PV's been created with a CSI source.
isProvisionedByCSI, err := isProvisionedByCSI(log, pv, cli)
if err != nil {
return "", errors.WithStack(err)
}
if isProvisionedByCSI {
if pv.Spec.VolumeMode != nil && *pv.Spec.VolumeMode == corev1api.PersistentVolumeBlock {
return pvc.Spec.VolumeName, nil
}
return pvc.Spec.VolumeName + "/mount", nil
}
return pvc.Spec.VolumeName, nil
}
// GetVolumeMode gets the uploader.PersistentVolumeMode of the volume.
func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (
uploader.PersistentVolumeMode, error) {
_, pv, _, err := GetPodPVCVolume(ctx, log, pod, volumeName, cli)
if err != nil {
if err == ErrorPodVolumeIsNotPVC {
return uploader.PersistentVolumeFilesystem, nil
}
return "", errors.WithStack(err)
}
if pv.Spec.VolumeMode != nil && *pv.Spec.VolumeMode == corev1api.PersistentVolumeBlock {
return uploader.PersistentVolumeBlock, nil
}
return uploader.PersistentVolumeFilesystem, nil
}
// GetPodPVCVolume gets the PVC, PV and volume for a pod volume name.
// Returns pod volume in case of ErrorPodVolumeIsNotPVC error
func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (
*corev1api.PersistentVolumeClaim, *corev1api.PersistentVolume, *corev1api.Volume, error) {
var volume *corev1api.Volume
for i := range pod.Spec.Volumes {
@ -132,41 +186,26 @@ func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1
}
if volume == nil {
return "", errors.New("volume not found in pod")
return nil, nil, nil, errors.New("volume not found in pod")
}
// This case implies the administrator created the PV and attached it directly, without PVC.
// Note that only one VolumeSource can be populated per Volume on a pod
if volume.VolumeSource.PersistentVolumeClaim == nil {
if volume.VolumeSource.CSI != nil {
return volume.Name + "/mount", nil
}
return volume.Name, nil
return nil, nil, volume, ErrorPodVolumeIsNotPVC // There is a pod volume but it is not a PVC
}
// Most common case is that we have a PVC VolumeSource, and we need to check the PV it points to for a CSI source.
pvc := &corev1api.PersistentVolumeClaim{}
err := cli.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: volume.VolumeSource.PersistentVolumeClaim.ClaimName}, pvc)
if err != nil {
return "", errors.WithStack(err)
return nil, nil, nil, errors.WithStack(err)
}
pv := &corev1api.PersistentVolume{}
err = cli.Get(ctx, client.ObjectKey{Name: pvc.Spec.VolumeName}, pv)
if err != nil {
return "", errors.WithStack(err)
return nil, nil, nil, errors.WithStack(err)
}
// PV's been created with a CSI source.
isProvisionedByCSI, err := isProvisionedByCSI(log, pv, cli)
if err != nil {
return "", errors.WithStack(err)
}
if isProvisionedByCSI {
return pvc.Spec.VolumeName + "/mount", nil
}
return pvc.Spec.VolumeName, nil
return pvc, pv, volume, nil
}
// isProvisionedByCSI function checks whether this is a CSI PV by annotation.

View File

@ -38,6 +38,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/builder"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
)
func TestNamespaceAndName(t *testing.T) {
@ -164,6 +165,13 @@ func TestGetVolumeDirectorySuccess(t *testing.T) {
pv: builder.ForPersistentVolume("a-pv").CSI("csi.test.com", "provider-volume-id").Result(),
want: "a-pv/mount",
},
{
name: "Block CSI volume with a PVC/PV does not append '/mount' to the volume name",
pod: builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").PersistentVolumeClaimSource("my-pvc").Result()).Result(),
pvc: builder.ForPersistentVolumeClaim("ns-1", "my-pvc").VolumeName("a-pv").Result(),
pv: builder.ForPersistentVolume("a-pv").CSI("csi.test.com", "provider-volume-id").VolumeMode(corev1.PersistentVolumeBlock).Result(),
want: "a-pv",
},
{
name: "CSI volume mounted without a PVC appends '/mount' to the volume name",
pod: builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").CSISource("csi.test.com").Result()).Result(),
@ -211,6 +219,54 @@ func TestGetVolumeDirectorySuccess(t *testing.T) {
}
}
// TestGetVolumeModeSuccess tests the GetVolumeMode function
func TestGetVolumeModeSuccess(t *testing.T) {
tests := []struct {
name string
pod *corev1.Pod
pvc *corev1.PersistentVolumeClaim
pv *corev1.PersistentVolume
want uploader.PersistentVolumeMode
}{
{
name: "Filesystem PVC volume",
pod: builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").PersistentVolumeClaimSource("my-pvc").Result()).Result(),
pvc: builder.ForPersistentVolumeClaim("ns-1", "my-pvc").VolumeName("a-pv").Result(),
pv: builder.ForPersistentVolume("a-pv").VolumeMode(corev1.PersistentVolumeFilesystem).Result(),
want: uploader.PersistentVolumeFilesystem,
},
{
name: "Block PVC volume",
pod: builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").PersistentVolumeClaimSource("my-pvc").Result()).Result(),
pvc: builder.ForPersistentVolumeClaim("ns-1", "my-pvc").VolumeName("a-pv").Result(),
pv: builder.ForPersistentVolume("a-pv").VolumeMode(corev1.PersistentVolumeBlock).Result(),
want: uploader.PersistentVolumeBlock,
},
{
name: "Pod volume without a PVC",
pod: builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").Result()).Result(),
want: uploader.PersistentVolumeFilesystem,
},
}
for _, tc := range tests {
clientBuilder := fake.NewClientBuilder()
if tc.pvc != nil {
clientBuilder = clientBuilder.WithObjects(tc.pvc)
}
if tc.pv != nil {
clientBuilder = clientBuilder.WithObjects(tc.pv)
}
// Function under test
mode, err := GetVolumeMode(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, clientBuilder.Build())
require.NoError(t, err)
assert.Equal(t, tc.want, mode)
}
}
func TestIsV1Beta1CRDReady(t *testing.T) {
tests := []struct {
name string

View File

@ -75,24 +75,10 @@ To mount the correct hostpath to pods volumes, run the node-agent pod in `privil
oc adm policy add-scc-to-user privileged -z velero -n velero
```
2. Modify the DaemonSet yaml to request a privileged mode:
```diff
@@ -67,3 +67,5 @@ spec:
value: /credentials/cloud
- name: VELERO_SCRATCH_DIR
value: /scratch
+ securityContext:
+ privileged: true
2. Install Velero with the '--privileged-node-agent' option to request a privileged mode:
```
or
```shell
oc patch ds/node-agent \
--namespace velero \
--type json \
-p '[{"op":"add","path":"/spec/template/spec/containers/0/securityContext","value": { "privileged": true}}]'
velero install --use-node-agent --privileged-node-agent
```
If node-agent is not running in a privileged mode, it will not be able to access snapshot volumes within the mounted

View File

@ -23,6 +23,14 @@ By default, `velero install` does not install Velero's [File System Backup][3].
If you've already run `velero install` without the `--use-node-agent` flag, you can run the same command again, including the `--use-node-agent` flag, to add the file system backup to your existing install.
## CSI Snapshot Data Movement
Velero node-agent is required by CSI snapshot data movement when Velero built-in data mover is used. By default, `velero install` does not install Velero's node-agent. To enable it, specify the `--use-node-agent` flag.
For some use cases, Velero node-agent requires to run under privileged mode. For example, when backing up block volumes, it is required to allow the node-agent to access the block device. To enable it set velero install flags `--privileged-node-agent`.
If you've already run `velero install` without the `--use-node-agent` or `--privileged-node-agent` flag, you can run the same command again, including the `--use-node-agent` or `--privileged-node-agent` flag, to add CSI snapshot data movement to your existing install.
## Default Pod Volume backup to file system backup
By default, `velero install` does not enable the use of File System Backup (FSB) to take backups of all pod volumes. You must apply an [annotation](file-system-backup.md/#using-opt-in-pod-volume-backup) to every pod which contains volumes for Velero to use FSB for the backup.

View File

@ -111,24 +111,10 @@ To mount the correct hostpath to pods volumes, run the node-agent pod in `privil
oc adm policy add-scc-to-user privileged -z velero -n velero
```
2. Modify the DaemonSet yaml to request a privileged mode:
```diff
@@ -67,3 +67,5 @@ spec:
value: /credentials/cloud
- name: VELERO_SCRATCH_DIR
value: /scratch
+ securityContext:
+ privileged: true
2. Install Velero with the '--privileged-node-agent' option to request a privileged mode:
```
or
```shell
oc patch ds/node-agent \
--namespace velero \
--type json \
-p '[{"op":"add","path":"/spec/template/spec/containers/0/securityContext","value": { "privileged": true}}]'
velero install --use-node-agent --privileged-node-agent
```

View File

@ -49,6 +49,9 @@ spec:
- mountPath: /host_pods
mountPropagation: HostToContainer
name: host-pods
- mountPath: /var/lib/kubelet/plugins
mountPropagation: HostToContainer
name: host-plugins
- mountPath: /scratch
name: scratch
- mountPath: /credentials
@ -60,6 +63,9 @@ spec:
- hostPath:
path: /var/lib/kubelet/pods
name: host-pods
- hostPath:
path: /var/lib/kubelet/plugins
name: host-plugins
- emptyDir: {}
name: scratch
- name: cloud-credentials