diff --git a/changelogs/unreleased/6680-dzaninovic b/changelogs/unreleased/6680-dzaninovic
new file mode 100644
index 000000000..b4d735d14
--- /dev/null
+++ b/changelogs/unreleased/6680-dzaninovic
@@ -0,0 +1 @@
+Add support for block volumes with Kopia
\ No newline at end of file
diff --git a/design/CLI/PoC/overlays/plugins/node-agent.yaml b/design/CLI/PoC/overlays/plugins/node-agent.yaml
index dbb4ce18d..0db26e2c5 100644
--- a/design/CLI/PoC/overlays/plugins/node-agent.yaml
+++ b/design/CLI/PoC/overlays/plugins/node-agent.yaml
@@ -49,6 +49,9 @@ spec:
             - mountPath: /host_pods
               mountPropagation: HostToContainer
               name: host-pods
+            - mountPath: /var/lib/kubelet/plugins
+              mountPropagation: HostToContainer
+              name: host-plugins
             - mountPath: /scratch
               name: scratch
             - mountPath: /credentials
@@ -60,6 +63,9 @@ spec:
         - hostPath:
             path: /var/lib/kubelet/pods
           name: host-pods
+        - hostPath:
+            path: /var/lib/kubelet/plugins
+          name: host-plugins
         - emptyDir: {}
           name: scratch
         - name: cloud-credentials
diff --git a/design/volume-snapshot-data-movement/volume-snapshot-data-movement.md b/design/volume-snapshot-data-movement/volume-snapshot-data-movement.md
index d006b29e2..6a88b8b48 100644
--- a/design/volume-snapshot-data-movement/volume-snapshot-data-movement.md
+++ b/design/volume-snapshot-data-movement/volume-snapshot-data-movement.md
@@ -703,33 +703,38 @@ type Provider interface {
 In this case, we will extend the default kopia uploader to add the ability, when a given volume is for a block mode and is mapped as a device, we will use the [StreamingFile](https://pkg.go.dev/github.com/kopia/kopia@v0.13.0/fs#StreamingFile) to stream the device and backup to the kopia repository. 
 
 ```go
-func getLocalBlockEntry(kopiaEntry fs.Entry, log logrus.FieldLogger) (fs.Entry, error) {
-	path := kopiaEntry.LocalFilesystemPath()
-
-	fileInfo, err := os.Lstat(path)
+func getLocalBlockEntry(sourcePath string) (fs.Entry, error) {
+	source, err := resolveSymlink(sourcePath)
 	if err != nil {
-		return nil, errors.Wrapf(err, "Unable to get the source device information %s", path)
+		return nil, errors.Wrap(err, "resolveSymlink")
+	}
+
+	fileInfo, err := os.Lstat(source)
+	if err != nil {
+		return nil, errors.Wrapf(err, "unable to get the source device information %s", source)
 	}
 
 	if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
-		return nil, errors.Errorf("Source path %s is not a block device", path)
+		return nil, errors.Errorf("source path %s is not a block device", source)
 	}
-  	device, err := os.Open(path)
+
+	device, err := os.Open(source)
 	if err != nil {
 		if os.IsPermission(err) || err.Error() == ErrNotPermitted {
-			return nil, errors.Wrapf(err, "No permission to open the source device %s, make sure that node agent is running in privileged mode", path)
+			return nil, errors.Wrapf(err, "no permission to open the source device %s, make sure that node agent is running in privileged mode", source)
 		}
-		return nil, errors.Wrapf(err, "Unable to open the source device %s", path)
+		return nil, errors.Wrapf(err, "unable to open the source device %s", source)
 	}
-	return virtualfs.StreamingFileFromReader(kopiaEntry.Name(), device), nil
 
+	sf := virtualfs.StreamingFileFromReader(source, device)
+	return virtualfs.NewStaticDirectory(source, []fs.Entry{sf}), nil
 }
 ```
 
 In the `pkg/uploader/kopia/snapshot.go` this is used in the Backup call like
 
 ```go
-	if volMode == PersistentVolumeFilesystem {
+	if volMode == uploader.PersistentVolumeFilesystem {
 		// to be consistent with restic when backup empty dir returns one error for upper logic handle
 		dirs, err := os.ReadDir(source)
 		if err != nil {
@@ -742,15 +747,17 @@ In the `pkg/uploader/kopia/snapshot.go` this is used in the Backup call like
   source = filepath.Clean(source)
   ...
 
-  sourceEntry, err := getLocalFSEntry(source)
-	if err != nil {
-		return nil, false, errors.Wrap(err, "Unable to get local filesystem entry")
-	}
+	var sourceEntry fs.Entry
 
-  if volMode == PersistentVolumeBlock {
-		sourceEntry, err = getLocalBlockEntry(sourceEntry, log)
+	if volMode == uploader.PersistentVolumeBlock {
+		sourceEntry, err = getLocalBlockEntry(source)
 		if err != nil {
-			return nil, false, errors.Wrap(err, "Unable to get local block device entry")
+			return nil, false, errors.Wrap(err, "unable to get local block device entry")
+		}
+	} else {
+		sourceEntry, err = getLocalFSEntry(source)
+		if err != nil {
+			return nil, false, errors.Wrap(err, "unable to get local filesystem entry")
 		}
 	}
 
@@ -766,6 +773,8 @@ We only need to extend two functions the rest will be passed through.
 ```go
 type BlockOutput struct {
 	*restore.FilesystemOutput
+
+	targetFileName string
 }
 
 var _ restore.Output = &BlockOutput{}
@@ -773,30 +782,15 @@ var _ restore.Output = &BlockOutput{}
 const bufferSize = 128 * 1024
 
 func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remoteFile fs.File) error {
-
-	targetFileName, err := filepath.EvalSymlinks(o.TargetPath)
-	if err != nil {
-		return errors.Wrapf(err, "Unable to evaluate symlinks for %s", targetFileName)
-	}
-
-	fileInfo, err := os.Lstat(targetFileName)
-	if err != nil {
-		return errors.Wrapf(err, "Unable to get the target device information for %s", targetFileName)
-	}
-
-	if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
-		return errors.Errorf("Target file %s is not a block device", targetFileName)
-	}
-
 	remoteReader, err := remoteFile.Open(ctx)
 	if err != nil {
-		return errors.Wrapf(err, "Failed to open remote file %s", remoteFile.Name())
+		return errors.Wrapf(err, "failed to open remote file %s", remoteFile.Name())
 	}
 	defer remoteReader.Close()
 
-	targetFile, err := os.Create(targetFileName)
+	targetFile, err := os.Create(o.targetFileName)
 	if err != nil {
-		return errors.Wrapf(err, "Failed to open file %s", targetFileName)
+		return errors.Wrapf(err, "failed to open file %s", o.targetFileName)
 	}
 	defer targetFile.Close()
 
@@ -807,7 +801,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
 		bytesToWrite, err := remoteReader.Read(buffer)
 		if err != nil {
 			if err != io.EOF {
-				return errors.Wrapf(err, "Failed to read data from remote file %s", targetFileName)
+				return errors.Wrapf(err, "failed to read data from remote file %s", o.targetFileName)
 			}
 			readData = false
 		}
@@ -819,7 +813,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
 					bytesToWrite -= bytesWritten
 					offset += bytesWritten
 				} else {
-					return errors.Wrapf(err, "Failed to write data to file %s", targetFileName)
+					return errors.Wrapf(err, "failed to write data to file %s", o.targetFileName)
 				}
 			}
 		}
@@ -829,42 +823,43 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
 }
 
 func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error {
-	targetFileName, err := filepath.EvalSymlinks(o.TargetPath)
+	var err error
+	o.targetFileName, err = filepath.EvalSymlinks(o.TargetPath)
 	if err != nil {
-		return errors.Wrapf(err, "Unable to evaluate symlinks for %s", targetFileName)
+		return errors.Wrapf(err, "unable to evaluate symlinks for %s", o.targetFileName)
 	}
 
-	fileInfo, err := os.Lstat(targetFileName)
+	fileInfo, err := os.Lstat(o.targetFileName)
 	if err != nil {
-		return errors.Wrapf(err, "Unable to get the target device information for %s", o.TargetPath)
+		return errors.Wrapf(err, "unable to get the target device information for %s", o.TargetPath)
 	}
 
 	if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
-		return errors.Errorf("Target file %s is not a block device", o.TargetPath)
+		return errors.Errorf("target file %s is not a block device", o.TargetPath)
 	}
 
 	return nil
 }
 ```
 
-Of note, we do need to add root access to the daemon set node agent to access the new mount. 
+Additional mount is required in the node-agent specification to resolve symlinks to the block devices from /host_pods/POD_ID/volumeDevices/kubernetes.io~csi directory.
 
 ```yaml
-...
             - mountPath: /var/lib/kubelet/plugins
               mountPropagation: HostToContainer
               name: host-plugins
-
 ....
         - hostPath:
             path: /var/lib/kubelet/plugins
           name: host-plugins
+```
 
-...
+Privileged mode is required to access the block devices in /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish directory as confirmed by testing on EKS and Minikube.
+
+```yaml
 							SecurityContext: &corev1.SecurityContext{
-								Privileged: &c.privilegedAgent,
+								Privileged: &c.privilegedNodeAgent,
 							},
-
 ```
 
 ## Plugin Data Movers
diff --git a/pkg/builder/persistent_volume_builder.go b/pkg/builder/persistent_volume_builder.go
index 5fee88c19..4cf2e47f2 100644
--- a/pkg/builder/persistent_volume_builder.go
+++ b/pkg/builder/persistent_volume_builder.go
@@ -95,6 +95,12 @@ func (b *PersistentVolumeBuilder) StorageClass(name string) *PersistentVolumeBui
 	return b
 }
 
+// VolumeMode sets the PersistentVolume's volume mode.
+func (b *PersistentVolumeBuilder) VolumeMode(volMode corev1api.PersistentVolumeMode) *PersistentVolumeBuilder {
+	b.object.Spec.VolumeMode = &volMode
+	return b
+}
+
 // NodeAffinityRequired sets the PersistentVolume's NodeAffinity Requirement.
 func (b *PersistentVolumeBuilder) NodeAffinityRequired(req *corev1api.NodeSelector) *PersistentVolumeBuilder {
 	b.object.Spec.NodeAffinity = &corev1api.VolumeNodeAffinity{
diff --git a/pkg/cmd/cli/install/install.go b/pkg/cmd/cli/install/install.go
index 25b7d0999..72fde9ef4 100644
--- a/pkg/cmd/cli/install/install.go
+++ b/pkg/cmd/cli/install/install.go
@@ -66,6 +66,7 @@ type Options struct {
 	BackupStorageConfig       flag.Map
 	VolumeSnapshotConfig      flag.Map
 	UseNodeAgent              bool
+	PrivilegedNodeAgent       bool
 	//TODO remove UseRestic when migration test out of using it
 	UseRestic                       bool
 	Wait                            bool
@@ -110,6 +111,7 @@ func (o *Options) BindFlags(flags *pflag.FlagSet) {
 	flags.BoolVar(&o.RestoreOnly, "restore-only", o.RestoreOnly, "Run the server in restore-only mode. Optional.")
 	flags.BoolVar(&o.DryRun, "dry-run", o.DryRun, "Generate resources, but don't send them to the cluster. Use with -o. Optional.")
 	flags.BoolVar(&o.UseNodeAgent, "use-node-agent", o.UseNodeAgent, "Create Velero node-agent daemonset. Optional. Velero node-agent hosts Velero modules that need to run in one or more nodes(i.e. Restic, Kopia).")
+	flags.BoolVar(&o.PrivilegedNodeAgent, "privileged-node-agent", o.PrivilegedNodeAgent, "Use privileged mode for the node agent. Optional. Required to backup block devices.")
 	flags.BoolVar(&o.Wait, "wait", o.Wait, "Wait for Velero deployment to be ready. Optional.")
 	flags.DurationVar(&o.DefaultRepoMaintenanceFrequency, "default-repo-maintain-frequency", o.DefaultRepoMaintenanceFrequency, "How often 'maintain' is run for backup repositories by default. Optional.")
 	flags.DurationVar(&o.GarbageCollectionFrequency, "garbage-collection-frequency", o.GarbageCollectionFrequency, "How often the garbage collection runs for expired backups.(default 1h)")
@@ -198,6 +200,7 @@ func (o *Options) AsVeleroOptions() (*install.VeleroOptions, error) {
 		SecretData:                      secretData,
 		RestoreOnly:                     o.RestoreOnly,
 		UseNodeAgent:                    o.UseNodeAgent,
+		PrivilegedNodeAgent:             o.PrivilegedNodeAgent,
 		UseVolumeSnapshots:              o.UseVolumeSnapshots,
 		BSLConfig:                       o.BackupStorageConfig.Data(),
 		VSLConfig:                       o.VolumeSnapshotConfig.Data(),
diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go
index 18f7e09f9..8bc650f5f 100644
--- a/pkg/controller/data_upload_controller.go
+++ b/pkg/controller/data_upload_controller.go
@@ -177,7 +177,10 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
 			return ctrl.Result{}, nil
 		}
 
-		exposeParam := r.setupExposeParam(du)
+		exposeParam, err := r.setupExposeParam(du)
+		if err != nil {
+			return r.errorOut(ctx, du, err, "failed to set exposer parameters", log)
+		}
 
 		// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
 		// but the pod maybe is not in the same node of the current controller, so we need to return it here.
@@ -735,18 +738,33 @@ func (r *DataUploadReconciler) closeDataPath(ctx context.Context, duName string)
 	r.dataPathMgr.RemoveAsyncBR(duName)
 }
 
-func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) interface{} {
+func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) (interface{}, error) {
 	if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
+		pvc := &corev1.PersistentVolumeClaim{}
+		err := r.client.Get(context.Background(), types.NamespacedName{
+			Namespace: du.Spec.SourceNamespace,
+			Name:      du.Spec.SourcePVC,
+		}, pvc)
+
+		if err != nil {
+			return nil, errors.Wrapf(err, "failed to get PVC %s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC)
+		}
+
+		accessMode := exposer.AccessModeFileSystem
+		if pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == corev1.PersistentVolumeBlock {
+			accessMode = exposer.AccessModeBlock
+		}
+
 		return &exposer.CSISnapshotExposeParam{
 			SnapshotName:     du.Spec.CSISnapshot.VolumeSnapshot,
 			SourceNamespace:  du.Spec.SourceNamespace,
 			StorageClass:     du.Spec.CSISnapshot.StorageClass,
 			HostingPodLabels: map[string]string{velerov1api.DataUploadLabel: du.Name},
-			AccessMode:       exposer.AccessModeFileSystem,
+			AccessMode:       accessMode,
 			Timeout:          du.Spec.OperationTimeout.Duration,
-		}
+		}, nil
 	}
-	return nil
+	return nil, nil
 }
 
 func (r *DataUploadReconciler) setupWaitExposePara(du *velerov2alpha1api.DataUpload) interface{} {
diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go
index 270a084ad..34bc4a6aa 100644
--- a/pkg/controller/data_upload_controller_test.go
+++ b/pkg/controller/data_upload_controller_test.go
@@ -306,6 +306,7 @@ func TestReconcile(t *testing.T) {
 		name                string
 		du                  *velerov2alpha1api.DataUpload
 		pod                 *corev1.Pod
+		pvc                 *corev1.PersistentVolumeClaim
 		snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
 		dataMgr             *datapath.Manager
 		expectedProcessed   bool
@@ -345,11 +346,21 @@ func TestReconcile(t *testing.T) {
 		}, {
 			name:              "Dataupload should be accepted",
 			du:                dataUploadBuilder().Result(),
-			pod:               builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
+			pod:               builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(),
+			pvc:               builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(),
 			expectedProcessed: false,
 			expected:          dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
 			expectedRequeue:   ctrl.Result{},
 		},
+		{
+			name:              "Dataupload should fail to get PVC information",
+			du:                dataUploadBuilder().Result(),
+			pod:               builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "wrong-pvc"}).Result(),
+			expectedProcessed: true,
+			expected:          dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
+			expectedRequeue:   ctrl.Result{},
+			expectedErrMsg:    "failed to get PVC",
+		},
 		{
 			name:              "Dataupload should be prepared",
 			du:                dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(),
@@ -448,6 +459,11 @@ func TestReconcile(t *testing.T) {
 				require.NoError(t, err)
 			}
 
+			if test.pvc != nil {
+				err = r.client.Create(ctx, test.pvc)
+				require.NoError(t, err)
+			}
+
 			if test.dataMgr != nil {
 				r.dataPathMgr = test.dataMgr
 			} else {
diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go
index 741f6ae08..fba9eac7b 100644
--- a/pkg/datapath/file_system.go
+++ b/pkg/datapath/file_system.go
@@ -133,10 +133,10 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, paren
 	if !fs.initialized {
 		return errors.New("file system data path is not initialized")
 	}
-	volMode := getPersistentVolumeMode(source)
 
 	go func() {
-		snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, volMode, fs)
+		snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull,
+			parentSnapshot, source.VolMode, fs)
 
 		if err == provider.ErrorCanceled {
 			fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
@@ -155,10 +155,8 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
 		return errors.New("file system data path is not initialized")
 	}
 
-	volMode := getPersistentVolumeMode(target)
-
 	go func() {
-		err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, volMode, fs)
+		err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, fs)
 
 		if err == provider.ErrorCanceled {
 			fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
@@ -172,13 +170,6 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
 	return nil
 }
 
-func getPersistentVolumeMode(source AccessPoint) uploader.PersistentVolumeMode {
-	if source.ByBlock != "" {
-		return uploader.PersistentVolumeBlock
-	}
-	return uploader.PersistentVolumeFilesystem
-}
-
 // UpdateProgress which implement ProgressUpdater interface to update progress status
 func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) {
 	if fs.callbacks.OnProgress != nil {
diff --git a/pkg/datapath/types.go b/pkg/datapath/types.go
index 431fccc5d..e26cf9482 100644
--- a/pkg/datapath/types.go
+++ b/pkg/datapath/types.go
@@ -53,7 +53,7 @@ type Callbacks struct {
 // AccessPoint represents an access point that has been exposed to a data path instance
 type AccessPoint struct {
 	ByPath  string
-	ByBlock string
+	VolMode uploader.PersistentVolumeMode
 }
 
 // AsyncBR is the interface for asynchronous data path methods
diff --git a/pkg/exposer/csi_snapshot.go b/pkg/exposer/csi_snapshot.go
index e0842f5f1..a0492d523 100644
--- a/pkg/exposer/csi_snapshot.go
+++ b/pkg/exposer/csi_snapshot.go
@@ -233,9 +233,12 @@ func (e *csiSnapshotExposer) CleanUp(ctx context.Context, ownerObject corev1.Obj
 }
 
 func getVolumeModeByAccessMode(accessMode string) (corev1.PersistentVolumeMode, error) {
-	if accessMode == AccessModeFileSystem {
+	switch accessMode {
+	case AccessModeFileSystem:
 		return corev1.PersistentVolumeFilesystem, nil
-	} else {
+	case AccessModeBlock:
+		return corev1.PersistentVolumeBlock, nil
+	default:
 		return "", errors.Errorf("unsupported access mode %s", accessMode)
 	}
 }
@@ -356,6 +359,7 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co
 	}
 
 	var gracePeriod int64 = 0
+	volumeMounts, volumeDevices := kube.MakePodPVCAttachment(volumeName, backupPVC.Spec.VolumeMode)
 
 	pod := &corev1.Pod{
 		ObjectMeta: metav1.ObjectMeta{
@@ -379,10 +383,8 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co
 					Image:           podInfo.image,
 					ImagePullPolicy: corev1.PullNever,
 					Command:         []string{"/velero-helper", "pause"},
-					VolumeMounts: []corev1.VolumeMount{{
-						Name:      volumeName,
-						MountPath: "/" + volumeName,
-					}},
+					VolumeMounts:    volumeMounts,
+					VolumeDevices:   volumeDevices,
 				},
 			},
 			ServiceAccountName:            podInfo.serviceAccount,
diff --git a/pkg/exposer/generic_restore.go b/pkg/exposer/generic_restore.go
index ca5cd68a3..0868aba47 100644
--- a/pkg/exposer/generic_restore.go
+++ b/pkg/exposer/generic_restore.go
@@ -82,7 +82,7 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.O
 		return errors.Errorf("Target PVC %s/%s has already been bound, abort", sourceNamespace, targetPVCName)
 	}
 
-	restorePod, err := e.createRestorePod(ctx, ownerObject, hostingPodLabels, selectedNode)
+	restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, hostingPodLabels, selectedNode)
 	if err != nil {
 		return errors.Wrapf(err, "error to create restore pod")
 	}
@@ -247,7 +247,8 @@ func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject co
 	return nil
 }
 
-func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, label map[string]string, selectedNode string) (*corev1.Pod, error) {
+func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, targetPVC *corev1.PersistentVolumeClaim,
+	label map[string]string, selectedNode string) (*corev1.Pod, error) {
 	restorePodName := ownerObject.Name
 	restorePVCName := ownerObject.Name
 
@@ -260,6 +261,7 @@ func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObjec
 	}
 
 	var gracePeriod int64 = 0
+	volumeMounts, volumeDevices := kube.MakePodPVCAttachment(volumeName, targetPVC.Spec.VolumeMode)
 
 	pod := &corev1.Pod{
 		ObjectMeta: metav1.ObjectMeta{
@@ -283,10 +285,8 @@ func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObjec
 					Image:           podInfo.image,
 					ImagePullPolicy: corev1.PullNever,
 					Command:         []string{"/velero-helper", "pause"},
-					VolumeMounts: []corev1.VolumeMount{{
-						Name:      volumeName,
-						MountPath: "/" + volumeName,
-					}},
+					VolumeMounts:    volumeMounts,
+					VolumeDevices:   volumeDevices,
 				},
 			},
 			ServiceAccountName:            podInfo.serviceAccount,
diff --git a/pkg/exposer/host_path.go b/pkg/exposer/host_path.go
index 458667d92..94dc4503c 100644
--- a/pkg/exposer/host_path.go
+++ b/pkg/exposer/host_path.go
@@ -26,11 +26,13 @@ import (
 	ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
 
 	"github.com/vmware-tanzu/velero/pkg/datapath"
+	"github.com/vmware-tanzu/velero/pkg/uploader"
 	"github.com/vmware-tanzu/velero/pkg/util/filesystem"
 	"github.com/vmware-tanzu/velero/pkg/util/kube"
 )
 
 var getVolumeDirectory = kube.GetVolumeDirectory
+var getVolumeMode = kube.GetVolumeMode
 var singlePathMatch = kube.SinglePathMatch
 
 // GetPodVolumeHostPath returns a path that can be accessed from the host for a given volume of a pod
@@ -45,7 +47,17 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1.Pod, volumeName strin
 
 	logger.WithField("volDir", volDir).Info("Got volume dir")
 
-	pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(pod.GetUID()), volDir)
+	volMode, err := getVolumeMode(ctx, logger, pod, volumeName, cli)
+	if err != nil {
+		return datapath.AccessPoint{}, errors.Wrapf(err, "error getting volume mode for volume %s in pod %s", volumeName, pod.Name)
+	}
+
+	volSubDir := "volumes"
+	if volMode == uploader.PersistentVolumeBlock {
+		volSubDir = "volumeDevices"
+	}
+
+	pathGlob := fmt.Sprintf("/host_pods/%s/%s/*/%s", string(pod.GetUID()), volSubDir, volDir)
 	logger.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob")
 
 	path, err := singlePathMatch(pathGlob, fs, logger)
@@ -56,6 +68,7 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1.Pod, volumeName strin
 	logger.WithField("path", path).Info("Found path matching glob")
 
 	return datapath.AccessPoint{
-		ByPath: path,
+		ByPath:  path,
+		VolMode: volMode,
 	}, nil
 }
diff --git a/pkg/exposer/host_path_test.go b/pkg/exposer/host_path_test.go
index f71518d2d..1022dffd3 100644
--- a/pkg/exposer/host_path_test.go
+++ b/pkg/exposer/host_path_test.go
@@ -29,17 +29,19 @@ import (
 	velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
 	"github.com/vmware-tanzu/velero/pkg/builder"
 	velerotest "github.com/vmware-tanzu/velero/pkg/test"
+	"github.com/vmware-tanzu/velero/pkg/uploader"
 	"github.com/vmware-tanzu/velero/pkg/util/filesystem"
 )
 
 func TestGetPodVolumeHostPath(t *testing.T) {
 	tests := []struct {
-		name             string
-		getVolumeDirFunc func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (string, error)
-		pathMatchFunc    func(string, filesystem.Interface, logrus.FieldLogger) (string, error)
-		pod              *corev1.Pod
-		pvc              string
-		err              string
+		name              string
+		getVolumeDirFunc  func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (string, error)
+		getVolumeModeFunc func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (uploader.PersistentVolumeMode, error)
+		pathMatchFunc     func(string, filesystem.Interface, logrus.FieldLogger) (string, error)
+		pod               *corev1.Pod
+		pvc               string
+		err               string
 	}{
 		{
 			name: "get volume dir fail",
@@ -55,6 +57,9 @@ func TestGetPodVolumeHostPath(t *testing.T) {
 			getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (string, error) {
 				return "", nil
 			},
+			getVolumeModeFunc: func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (uploader.PersistentVolumeMode, error) {
+				return uploader.PersistentVolumeFilesystem, nil
+			},
 			pathMatchFunc: func(string, filesystem.Interface, logrus.FieldLogger) (string, error) {
 				return "", errors.New("fake-error-2")
 			},
@@ -62,6 +67,18 @@ func TestGetPodVolumeHostPath(t *testing.T) {
 			pvc: "fake-pvc-1",
 			err: "error identifying unique volume path on host for volume fake-pvc-1 in pod fake-pod-2: fake-error-2",
 		},
+		{
+			name: "get block volume dir success",
+			getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (
+				string, error) {
+				return "fake-pvc-1", nil
+			},
+			pathMatchFunc: func(string, filesystem.Interface, logrus.FieldLogger) (string, error) {
+				return "/host_pods/fake-pod-1-id/volumeDevices/kubernetes.io~csi/fake-pvc-1-id", nil
+			},
+			pod: builder.ForPod(velerov1api.DefaultNamespace, "fake-pod-1").Result(),
+			pvc: "fake-pvc-1",
+		},
 	}
 
 	for _, test := range tests {
@@ -70,12 +87,18 @@ func TestGetPodVolumeHostPath(t *testing.T) {
 				getVolumeDirectory = test.getVolumeDirFunc
 			}
 
+			if test.getVolumeModeFunc != nil {
+				getVolumeMode = test.getVolumeModeFunc
+			}
+
 			if test.pathMatchFunc != nil {
 				singlePathMatch = test.pathMatchFunc
 			}
 
 			_, err := GetPodVolumeHostPath(context.Background(), test.pod, test.pvc, nil, nil, velerotest.NewLogger())
-			assert.EqualError(t, err, test.err)
+			if test.err != "" || err != nil {
+				assert.EqualError(t, err, test.err)
+			}
 		})
 	}
 }
diff --git a/pkg/exposer/types.go b/pkg/exposer/types.go
index 253256eb9..21c473366 100644
--- a/pkg/exposer/types.go
+++ b/pkg/exposer/types.go
@@ -22,6 +22,7 @@ import (
 
 const (
 	AccessModeFileSystem = "by-file-system"
+	AccessModeBlock      = "by-block-device"
 )
 
 // ExposeResult defines the result of expose.
diff --git a/pkg/install/daemonset.go b/pkg/install/daemonset.go
index b139f8124..8e74e16da 100644
--- a/pkg/install/daemonset.go
+++ b/pkg/install/daemonset.go
@@ -86,6 +86,14 @@ func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1.DaemonSet {
 								},
 							},
 						},
+						{
+							Name: "host-plugins",
+							VolumeSource: corev1.VolumeSource{
+								HostPath: &corev1.HostPathVolumeSource{
+									Path: "/var/lib/kubelet/plugins",
+								},
+							},
+						},
 						{
 							Name: "scratch",
 							VolumeSource: corev1.VolumeSource{
@@ -102,13 +110,20 @@ func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1.DaemonSet {
 								"/velero",
 							},
 							Args: daemonSetArgs,
-
+							SecurityContext: &corev1.SecurityContext{
+								Privileged: &c.privilegedNodeAgent,
+							},
 							VolumeMounts: []corev1.VolumeMount{
 								{
 									Name:             "host-pods",
 									MountPath:        "/host_pods",
 									MountPropagation: &mountPropagationMode,
 								},
+								{
+									Name:             "host-plugins",
+									MountPath:        "/var/lib/kubelet/plugins",
+									MountPropagation: &mountPropagationMode,
+								},
 								{
 									Name:      "scratch",
 									MountPath: "/scratch",
diff --git a/pkg/install/daemonset_test.go b/pkg/install/daemonset_test.go
index 762e95b16..017d5004d 100644
--- a/pkg/install/daemonset_test.go
+++ b/pkg/install/daemonset_test.go
@@ -35,7 +35,7 @@ func TestDaemonSet(t *testing.T) {
 
 	ds = DaemonSet("velero", WithSecret(true))
 	assert.Equal(t, 7, len(ds.Spec.Template.Spec.Containers[0].Env))
-	assert.Equal(t, 3, len(ds.Spec.Template.Spec.Volumes))
+	assert.Equal(t, 4, len(ds.Spec.Template.Spec.Volumes))
 
 	ds = DaemonSet("velero", WithFeatures([]string{"foo,bar,baz"}))
 	assert.Len(t, ds.Spec.Template.Spec.Containers[0].Args, 3)
diff --git a/pkg/install/deployment.go b/pkg/install/deployment.go
index 993d4b16f..917f7d9f1 100644
--- a/pkg/install/deployment.go
+++ b/pkg/install/deployment.go
@@ -47,6 +47,7 @@ type podTemplateConfig struct {
 	serviceAccountName              string
 	uploaderType                    string
 	defaultSnapshotMoveData         bool
+	privilegedNodeAgent             bool
 }
 
 func WithImage(image string) podTemplateOption {
@@ -149,6 +150,12 @@ func WithServiceAccountName(sa string) podTemplateOption {
 	}
 }
 
+func WithPrivilegedNodeAgent() podTemplateOption {
+	return func(c *podTemplateConfig) {
+		c.privilegedNodeAgent = true
+	}
+}
+
 func Deployment(namespace string, opts ...podTemplateOption) *appsv1.Deployment {
 	// TODO: Add support for server args
 	c := &podTemplateConfig{
diff --git a/pkg/install/resources.go b/pkg/install/resources.go
index a029ecc4a..9e2e8e4da 100644
--- a/pkg/install/resources.go
+++ b/pkg/install/resources.go
@@ -240,6 +240,7 @@ type VeleroOptions struct {
 	SecretData                      []byte
 	RestoreOnly                     bool
 	UseNodeAgent                    bool
+	PrivilegedNodeAgent             bool
 	UseVolumeSnapshots              bool
 	BSLConfig                       map[string]string
 	VSLConfig                       map[string]string
@@ -374,6 +375,9 @@ func AllResources(o *VeleroOptions) *unstructured.UnstructuredList {
 		if len(o.Features) > 0 {
 			dsOpts = append(dsOpts, WithFeatures(o.Features))
 		}
+		if o.PrivilegedNodeAgent {
+			dsOpts = append(dsOpts, WithPrivilegedNodeAgent())
+		}
 		ds := DaemonSet(o.Namespace, dsOpts...)
 		if err := appendUnstructured(resources, ds); err != nil {
 			fmt.Printf("error appending DaemonSet %s: %s\n", ds.GetName(), err.Error())
diff --git a/pkg/podvolume/backupper.go b/pkg/podvolume/backupper.go
index e7cfa2e5c..78c2b6d65 100644
--- a/pkg/podvolume/backupper.go
+++ b/pkg/podvolume/backupper.go
@@ -200,10 +200,11 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
 	b.resultsLock.Unlock()
 
 	var (
-		errs              []error
-		podVolumeBackups  []*velerov1api.PodVolumeBackup
-		podVolumes        = make(map[string]corev1api.Volume)
-		mountedPodVolumes = sets.String{}
+		errs               []error
+		podVolumeBackups   []*velerov1api.PodVolumeBackup
+		podVolumes         = make(map[string]corev1api.Volume)
+		mountedPodVolumes  = sets.String{}
+		attachedPodDevices = sets.String{}
 	)
 	pvcSummary := NewPVCBackupSummary()
 
@@ -233,6 +234,9 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
 		for _, volumeMount := range container.VolumeMounts {
 			mountedPodVolumes.Insert(volumeMount.Name)
 		}
+		for _, volumeDevice := range container.VolumeDevices {
+			attachedPodDevices.Insert(volumeDevice.Name)
+		}
 	}
 
 	repoIdentifier := ""
@@ -268,6 +272,15 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
 			continue
 		}
 
+		// check if volume is a block volume
+		if attachedPodDevices.Has(volumeName) {
+			msg := fmt.Sprintf("volume %s declared in pod %s/%s is a block volume. Block volumes are not supported for fs backup, skipping",
+				volumeName, pod.Namespace, pod.Name)
+			log.Warn(msg)
+			pvcSummary.addSkipped(volumeName, msg)
+			continue
+		}
+
 		// volumes that are not mounted by any container should not be backed up, because
 		// its directory is not created
 		if !mountedPodVolumes.Has(volumeName) {
diff --git a/pkg/uploader/kopia/block_backup.go b/pkg/uploader/kopia/block_backup.go
new file mode 100644
index 000000000..a637925a4
--- /dev/null
+++ b/pkg/uploader/kopia/block_backup.go
@@ -0,0 +1,55 @@
+/*
+Copyright The Velero Contributors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kopia
+
+import (
+	"os"
+	"syscall"
+
+	"github.com/kopia/kopia/fs"
+	"github.com/kopia/kopia/fs/virtualfs"
+	"github.com/pkg/errors"
+)
+
+const ErrNotPermitted = "operation not permitted"
+
+func getLocalBlockEntry(sourcePath string) (fs.Entry, error) {
+	source, err := resolveSymlink(sourcePath)
+	if err != nil {
+		return nil, errors.Wrap(err, "resolveSymlink")
+	}
+
+	fileInfo, err := os.Lstat(source)
+	if err != nil {
+		return nil, errors.Wrapf(err, "unable to get the source device information %s", source)
+	}
+
+	if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
+		return nil, errors.Errorf("source path %s is not a block device", source)
+	}
+
+	device, err := os.Open(source)
+	if err != nil {
+		if os.IsPermission(err) || err.Error() == ErrNotPermitted {
+			return nil, errors.Wrapf(err, "no permission to open the source device %s, make sure that node agent is running in privileged mode", source)
+		}
+		return nil, errors.Wrapf(err, "unable to open the source device %s", source)
+	}
+
+	sf := virtualfs.StreamingFileFromReader(source, device)
+	return virtualfs.NewStaticDirectory(source, []fs.Entry{sf}), nil
+}
diff --git a/pkg/uploader/kopia/block_restore.go b/pkg/uploader/kopia/block_restore.go
new file mode 100644
index 000000000..25d11ee24
--- /dev/null
+++ b/pkg/uploader/kopia/block_restore.go
@@ -0,0 +1,99 @@
+/*
+Copyright The Velero Contributors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kopia
+
+import (
+	"context"
+	"io"
+	"os"
+	"path/filepath"
+	"syscall"
+
+	"github.com/kopia/kopia/fs"
+	"github.com/kopia/kopia/snapshot/restore"
+	"github.com/pkg/errors"
+)
+
+type BlockOutput struct {
+	*restore.FilesystemOutput
+
+	targetFileName string
+}
+
+var _ restore.Output = &BlockOutput{}
+
+const bufferSize = 128 * 1024
+
+func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remoteFile fs.File) error {
+	remoteReader, err := remoteFile.Open(ctx)
+	if err != nil {
+		return errors.Wrapf(err, "failed to open remote file %s", remoteFile.Name())
+	}
+	defer remoteReader.Close()
+
+	targetFile, err := os.Create(o.targetFileName)
+	if err != nil {
+		return errors.Wrapf(err, "failed to open file %s", o.targetFileName)
+	}
+	defer targetFile.Close()
+
+	buffer := make([]byte, bufferSize)
+
+	readData := true
+	for readData {
+		bytesToWrite, err := remoteReader.Read(buffer)
+		if err != nil {
+			if err != io.EOF {
+				return errors.Wrapf(err, "failed to read data from remote file %s", o.targetFileName)
+			}
+			readData = false
+		}
+
+		if bytesToWrite > 0 {
+			offset := 0
+			for bytesToWrite > 0 {
+				if bytesWritten, err := targetFile.Write(buffer[offset:bytesToWrite]); err == nil {
+					bytesToWrite -= bytesWritten
+					offset += bytesWritten
+				} else {
+					return errors.Wrapf(err, "failed to write data to file %s", o.targetFileName)
+				}
+			}
+		}
+	}
+
+	return nil
+}
+
+func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error {
+	var err error
+	o.targetFileName, err = filepath.EvalSymlinks(o.TargetPath)
+	if err != nil {
+		return errors.Wrapf(err, "unable to evaluate symlinks for %s", o.targetFileName)
+	}
+
+	fileInfo, err := os.Lstat(o.targetFileName)
+	if err != nil {
+		return errors.Wrapf(err, "unable to get the target device information for %s", o.TargetPath)
+	}
+
+	if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
+		return errors.Errorf("target file %s is not a block device", o.TargetPath)
+	}
+
+	return nil
+}
diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go
index 02129c9f0..27ac84253 100644
--- a/pkg/uploader/kopia/snapshot.go
+++ b/pkg/uploader/kopia/snapshot.go
@@ -131,25 +131,22 @@ func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.Re
 	if fsUploader == nil {
 		return nil, false, errors.New("get empty kopia uploader")
 	}
-
-	if volMode == uploader.PersistentVolumeBlock {
-		return nil, false, errors.New("unable to handle block storage")
-	}
-
-	dir, err := filepath.Abs(sourcePath)
+	source, err := filepath.Abs(sourcePath)
 	if err != nil {
 		return nil, false, errors.Wrapf(err, "Invalid source path '%s'", sourcePath)
 	}
 
-	// to be consistent with restic when backup empty dir returns one error for upper logic handle
-	dirs, err := os.ReadDir(dir)
-	if err != nil {
-		return nil, false, errors.Wrapf(err, "Unable to read dir in path %s", dir)
-	} else if len(dirs) == 0 {
-		return nil, true, nil
+	if volMode == uploader.PersistentVolumeFilesystem {
+		// to be consistent with restic when backup empty dir returns one error for upper logic handle
+		dirs, err := os.ReadDir(source)
+		if err != nil {
+			return nil, false, errors.Wrapf(err, "Unable to read dir in path %s", source)
+		} else if len(dirs) == 0 {
+			return nil, true, nil
+		}
 	}
 
-	dir = filepath.Clean(dir)
+	source = filepath.Clean(source)
 
 	sourceInfo := snapshot.SourceInfo{
 		UserName: udmrepo.GetRepoUser(),
@@ -157,16 +154,25 @@ func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.Re
 		Path:     filepath.Clean(realSource),
 	}
 	if realSource == "" {
-		sourceInfo.Path = dir
+		sourceInfo.Path = source
 	}
 
-	rootDir, err := getLocalFSEntry(dir)
-	if err != nil {
-		return nil, false, errors.Wrap(err, "Unable to get local filesystem entry")
+	var sourceEntry fs.Entry
+
+	if volMode == uploader.PersistentVolumeBlock {
+		sourceEntry, err = getLocalBlockEntry(source)
+		if err != nil {
+			return nil, false, errors.Wrap(err, "unable to get local block device entry")
+		}
+	} else {
+		sourceEntry, err = getLocalFSEntry(source)
+		if err != nil {
+			return nil, false, errors.Wrap(err, "unable to get local filesystem entry")
+		}
 	}
 
 	kopiaCtx := kopia.SetupKopiaLog(ctx, log)
-	snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, rootDir, forceFull, parentSnapshot, tags, log, "Kopia Uploader")
+	snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, log, "Kopia Uploader")
 	if err != nil {
 		return nil, false, err
 	}
@@ -348,7 +354,8 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour
 }
 
 // Restore restore specific sourcePath with given snapshotID and update progress
-func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
+func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode,
+	log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
 	log.Info("Start to restore...")
 
 	kopiaCtx := kopia.SetupKopiaLog(ctx, log)
@@ -370,7 +377,7 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
 		return 0, 0, errors.Wrapf(err, "Unable to resolve path %v", dest)
 	}
 
-	output := &restore.FilesystemOutput{
+	fsOutput := &restore.FilesystemOutput{
 		TargetPath:             path,
 		OverwriteDirectories:   true,
 		OverwriteFiles:         true,
@@ -378,11 +385,18 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
 		IgnorePermissionErrors: true,
 	}
 
-	err = output.Init(ctx)
+	err = fsOutput.Init(ctx)
 	if err != nil {
 		return 0, 0, errors.Wrap(err, "error to init output")
 	}
 
+	var output restore.Output = fsOutput
+	if volMode == uploader.PersistentVolumeBlock {
+		output = &BlockOutput{
+			FilesystemOutput: fsOutput,
+		}
+	}
+
 	stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{
 		Parallel:               runtime.NumCPU(),
 		RestoreDirEntryAtDepth: math.MaxInt32,
diff --git a/pkg/uploader/kopia/snapshot_test.go b/pkg/uploader/kopia/snapshot_test.go
index 232ea92c4..65ec22136 100644
--- a/pkg/uploader/kopia/snapshot_test.go
+++ b/pkg/uploader/kopia/snapshot_test.go
@@ -23,6 +23,7 @@ import (
 	"time"
 
 	"github.com/kopia/kopia/fs"
+	"github.com/kopia/kopia/fs/virtualfs"
 	"github.com/kopia/kopia/repo"
 	"github.com/kopia/kopia/repo/manifest"
 	"github.com/kopia/kopia/snapshot"
@@ -594,11 +595,11 @@ func TestBackup(t *testing.T) {
 			expectedError: errors.New("Unable to read dir"),
 		},
 		{
-			name:          "Unable to handle block mode",
+			name:          "Source path is not a block device",
 			sourcePath:    "/",
 			tags:          nil,
 			volMode:       uploader.PersistentVolumeBlock,
-			expectedError: errors.New("unable to handle block storage"),
+			expectedError: errors.New("source path / is not a block device"),
 		},
 	}
 
@@ -660,6 +661,7 @@ func TestRestore(t *testing.T) {
 		expectedBytes       int64
 		expectedCount       int32
 		expectedError       error
+		volMode             uploader.PersistentVolumeMode
 	}
 
 	// Define test cases
@@ -697,6 +699,46 @@ func TestRestore(t *testing.T) {
 			snapshotID:    "snapshot-123",
 			expectedError: nil,
 		},
+		{
+			name: "Expect block volume successful",
+			filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) {
+				return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil
+			},
+			restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) {
+				return restore.Stats{}, nil
+			},
+			snapshotID:    "snapshot-123",
+			expectedError: nil,
+			volMode:       uploader.PersistentVolumeBlock,
+		},
+		{
+			name: "Unable to evaluate symlinks for block volume",
+			filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) {
+				return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil
+			},
+			restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) {
+				err := output.BeginDirectory(ctx, "fake-dir", virtualfs.NewStaticDirectory("fake-dir", nil))
+				return restore.Stats{}, err
+			},
+			snapshotID:    "snapshot-123",
+			expectedError: errors.New("unable to evaluate symlinks for"),
+			volMode:       uploader.PersistentVolumeBlock,
+			dest:          "/wrong-dest",
+		},
+		{
+			name: "Target file is not a block device",
+			filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) {
+				return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil
+			},
+			restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) {
+				err := output.BeginDirectory(ctx, "fake-dir", virtualfs.NewStaticDirectory("fake-dir", nil))
+				return restore.Stats{}, err
+			},
+			snapshotID:    "snapshot-123",
+			expectedError: errors.New("target file /tmp is not a block device"),
+			volMode:       uploader.PersistentVolumeBlock,
+			dest:          "/tmp",
+		},
 	}
 
 	em := &manifest.EntryMetadata{
@@ -706,6 +748,10 @@ func TestRestore(t *testing.T) {
 
 	for _, tc := range testCases {
 		t.Run(tc.name, func(t *testing.T) {
+			if tc.volMode == "" {
+				tc.volMode = uploader.PersistentVolumeFilesystem
+			}
+
 			if tc.invalidManifestType {
 				em.Labels[manifest.TypeLabelKey] = ""
 			} else {
@@ -725,7 +771,7 @@ func TestRestore(t *testing.T) {
 			repoWriterMock.On("OpenObject", mock.Anything, mock.Anything).Return(em, nil)
 
 			progress := new(Progress)
-			bytesRestored, fileCount, err := Restore(context.Background(), repoWriterMock, progress, tc.snapshotID, tc.dest, logrus.New(), nil)
+			bytesRestored, fileCount, err := Restore(context.Background(), repoWriterMock, progress, tc.snapshotID, tc.dest, tc.volMode, logrus.New(), nil)
 
 			// Check if the returned error matches the expected error
 			if tc.expectedError != nil {
diff --git a/pkg/uploader/provider/kopia.go b/pkg/uploader/provider/kopia.go
index dd32173b8..706393362 100644
--- a/pkg/uploader/provider/kopia.go
+++ b/pkg/uploader/provider/kopia.go
@@ -128,11 +128,6 @@ func (kp *kopiaProvider) RunBackup(
 		return "", false, errors.New("path is empty")
 	}
 
-	// For now, error on block mode
-	if volMode == uploader.PersistentVolumeBlock {
-		return "", false, errors.New("unable to currently support block mode")
-	}
-
 	log := kp.log.WithFields(logrus.Fields{
 		"path":           path,
 		"realSource":     realSource,
@@ -214,10 +209,6 @@ func (kp *kopiaProvider) RunRestore(
 		"volumePath": volumePath,
 	})
 
-	if volMode == uploader.PersistentVolumeBlock {
-		return errors.New("unable to currently support block mode")
-	}
-
 	repoWriter := kopia.NewShimRepo(kp.bkRepo)
 	progress := new(kopia.Progress)
 	progress.InitThrottle(restoreProgressCheckInterval)
@@ -235,7 +226,7 @@ func (kp *kopiaProvider) RunRestore(
 	// We use the cancel channel to control the restore cancel, so don't pass a context with cancel to Kopia restore.
 	// Otherwise, Kopia restore will not response to the cancel control but return an arbitrary error.
 	// Kopia restore cancel is not designed as well as Kopia backup which uses the context to control backup cancel all the way.
-	size, fileCount, err := RestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, log, restoreCancel)
+	size, fileCount, err := RestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, volMode, log, restoreCancel)
 
 	if err != nil {
 		return errors.Wrapf(err, "Failed to run kopia restore")
diff --git a/pkg/uploader/provider/kopia_test.go b/pkg/uploader/provider/kopia_test.go
index 944cdbcce..c38d370ce 100644
--- a/pkg/uploader/provider/kopia_test.go
+++ b/pkg/uploader/provider/kopia_test.go
@@ -94,12 +94,12 @@ func TestRunBackup(t *testing.T) {
 			notError: false,
 		},
 		{
-			name: "error on vol mode",
+			name: "success to backup block mode volume",
 			hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
-				return nil, true, nil
+				return &uploader.SnapshotInfo{}, false, nil
 			},
 			volMode:  uploader.PersistentVolumeBlock,
-			notError: false,
+			notError: true,
 		},
 	}
 	for _, tc := range testCases {
@@ -125,31 +125,31 @@ func TestRunRestore(t *testing.T) {
 
 	testCases := []struct {
 		name            string
-		hookRestoreFunc func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error)
+		hookRestoreFunc func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error)
 		notError        bool
 		volMode         uploader.PersistentVolumeMode
 	}{
 		{
 			name: "normal restore",
-			hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
+			hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
 				return 0, 0, nil
 			},
 			notError: true,
 		},
 		{
 			name: "failed to restore",
-			hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
+			hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
 				return 0, 0, errors.New("failed to restore")
 			},
 			notError: false,
 		},
 		{
-			name: "failed to restore block mode",
-			hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
-				return 0, 0, errors.New("failed to restore")
+			name: "normal block mode restore",
+			hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
+				return 0, 0, nil
 			},
 			volMode:  uploader.PersistentVolumeBlock,
-			notError: false,
+			notError: true,
 		},
 	}
 
diff --git a/pkg/uploader/provider/restic_test.go b/pkg/uploader/provider/restic_test.go
index f2203d7bd..62f44d04f 100644
--- a/pkg/uploader/provider/restic_test.go
+++ b/pkg/uploader/provider/restic_test.go
@@ -212,6 +212,9 @@ func TestResticRunRestore(t *testing.T) {
 
 	for _, tc := range testCases {
 		t.Run(tc.name, func(t *testing.T) {
+			if tc.volMode == "" {
+				tc.volMode = uploader.PersistentVolumeFilesystem
+			}
 			resticRestoreCMDFunc = tc.hookResticRestoreFunc
 			if tc.volMode == "" {
 				tc.volMode = uploader.PersistentVolumeFilesystem
diff --git a/pkg/util/kube/pvc_pv.go b/pkg/util/kube/pvc_pv.go
index 2af818d90..1a393b3ab 100644
--- a/pkg/util/kube/pvc_pv.go
+++ b/pkg/util/kube/pvc_pv.go
@@ -316,3 +316,23 @@ func WaitPVBound(ctx context.Context, pvGetter corev1client.CoreV1Interface, pvN
 func IsPVCBound(pvc *corev1api.PersistentVolumeClaim) bool {
 	return pvc.Spec.VolumeName != ""
 }
+
+// MakePodPVCAttachment returns the volume mounts and devices for a pod needed to attach a PVC
+func MakePodPVCAttachment(volumeName string, volumeMode *corev1api.PersistentVolumeMode) ([]corev1api.VolumeMount, []corev1api.VolumeDevice) {
+	var volumeMounts []corev1api.VolumeMount = nil
+	var volumeDevices []corev1api.VolumeDevice = nil
+
+	if volumeMode != nil && *volumeMode == corev1api.PersistentVolumeBlock {
+		volumeDevices = []corev1api.VolumeDevice{{
+			Name:       volumeName,
+			DevicePath: "/" + volumeName,
+		}}
+	} else {
+		volumeMounts = []corev1api.VolumeMount{{
+			Name:      volumeName,
+			MountPath: "/" + volumeName,
+		}}
+	}
+
+	return volumeMounts, volumeDevices
+}
diff --git a/pkg/util/kube/utils.go b/pkg/util/kube/utils.go
index 3d8d4e3ef..e1ed48dba 100644
--- a/pkg/util/kube/utils.go
+++ b/pkg/util/kube/utils.go
@@ -35,6 +35,7 @@ import (
 	corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
 	"sigs.k8s.io/controller-runtime/pkg/client"
 
+	"github.com/vmware-tanzu/velero/pkg/uploader"
 	"github.com/vmware-tanzu/velero/pkg/util/filesystem"
 )
 
@@ -50,6 +51,8 @@ const (
 	KubeAnnSelectedNode           = "volume.kubernetes.io/selected-node"
 )
 
+var ErrorPodVolumeIsNotPVC = errors.New("pod volume is not a PVC")
+
 // NamespaceAndName returns a string in the format <namespace>/<name>
 func NamespaceAndName(objMeta metav1.Object) string {
 	if objMeta.GetNamespace() == "" {
@@ -122,6 +125,57 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core
 // where the specified volume lives.
 // For volumes with a CSIVolumeSource, append "/mount" to the directory name.
 func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (string, error) {
+	pvc, pv, volume, err := GetPodPVCVolume(ctx, log, pod, volumeName, cli)
+	if err != nil {
+		// This case implies the administrator created the PV and attached it directly, without PVC.
+		// Note that only one VolumeSource can be populated per Volume on a pod
+		if err == ErrorPodVolumeIsNotPVC {
+			if volume.VolumeSource.CSI != nil {
+				return volume.Name + "/mount", nil
+			}
+			return volume.Name, nil
+		}
+		return "", errors.WithStack(err)
+	}
+
+	// Most common case is that we have a PVC VolumeSource, and we need to check the PV it points to for a CSI source.
+	// PV's been created with a CSI source.
+	isProvisionedByCSI, err := isProvisionedByCSI(log, pv, cli)
+	if err != nil {
+		return "", errors.WithStack(err)
+	}
+	if isProvisionedByCSI {
+		if pv.Spec.VolumeMode != nil && *pv.Spec.VolumeMode == corev1api.PersistentVolumeBlock {
+			return pvc.Spec.VolumeName, nil
+		}
+		return pvc.Spec.VolumeName + "/mount", nil
+	}
+
+	return pvc.Spec.VolumeName, nil
+}
+
+// GetVolumeMode gets the uploader.PersistentVolumeMode of the volume.
+func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (
+	uploader.PersistentVolumeMode, error) {
+	_, pv, _, err := GetPodPVCVolume(ctx, log, pod, volumeName, cli)
+
+	if err != nil {
+		if err == ErrorPodVolumeIsNotPVC {
+			return uploader.PersistentVolumeFilesystem, nil
+		}
+		return "", errors.WithStack(err)
+	}
+
+	if pv.Spec.VolumeMode != nil && *pv.Spec.VolumeMode == corev1api.PersistentVolumeBlock {
+		return uploader.PersistentVolumeBlock, nil
+	}
+	return uploader.PersistentVolumeFilesystem, nil
+}
+
+// GetPodPVCVolume gets the PVC, PV and volume for a pod volume name.
+// Returns pod volume in case of ErrorPodVolumeIsNotPVC error
+func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (
+	*corev1api.PersistentVolumeClaim, *corev1api.PersistentVolume, *corev1api.Volume, error) {
 	var volume *corev1api.Volume
 
 	for i := range pod.Spec.Volumes {
@@ -132,41 +186,26 @@ func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1
 	}
 
 	if volume == nil {
-		return "", errors.New("volume not found in pod")
+		return nil, nil, nil, errors.New("volume not found in pod")
 	}
 
-	// This case implies the administrator created the PV and attached it directly, without PVC.
-	// Note that only one VolumeSource can be populated per Volume on a pod
 	if volume.VolumeSource.PersistentVolumeClaim == nil {
-		if volume.VolumeSource.CSI != nil {
-			return volume.Name + "/mount", nil
-		}
-		return volume.Name, nil
+		return nil, nil, volume, ErrorPodVolumeIsNotPVC // There is a pod volume but it is not a PVC
 	}
 
-	// Most common case is that we have a PVC VolumeSource, and we need to check the PV it points to for a CSI source.
 	pvc := &corev1api.PersistentVolumeClaim{}
 	err := cli.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: volume.VolumeSource.PersistentVolumeClaim.ClaimName}, pvc)
 	if err != nil {
-		return "", errors.WithStack(err)
+		return nil, nil, nil, errors.WithStack(err)
 	}
 
 	pv := &corev1api.PersistentVolume{}
 	err = cli.Get(ctx, client.ObjectKey{Name: pvc.Spec.VolumeName}, pv)
 	if err != nil {
-		return "", errors.WithStack(err)
+		return nil, nil, nil, errors.WithStack(err)
 	}
 
-	// PV's been created with a CSI source.
-	isProvisionedByCSI, err := isProvisionedByCSI(log, pv, cli)
-	if err != nil {
-		return "", errors.WithStack(err)
-	}
-	if isProvisionedByCSI {
-		return pvc.Spec.VolumeName + "/mount", nil
-	}
-
-	return pvc.Spec.VolumeName, nil
+	return pvc, pv, volume, nil
 }
 
 // isProvisionedByCSI function checks whether this is a CSI PV by annotation.
diff --git a/pkg/util/kube/utils_test.go b/pkg/util/kube/utils_test.go
index 6edf08435..31110dc8c 100644
--- a/pkg/util/kube/utils_test.go
+++ b/pkg/util/kube/utils_test.go
@@ -38,6 +38,7 @@ import (
 
 	"github.com/vmware-tanzu/velero/pkg/builder"
 	velerotest "github.com/vmware-tanzu/velero/pkg/test"
+	"github.com/vmware-tanzu/velero/pkg/uploader"
 )
 
 func TestNamespaceAndName(t *testing.T) {
@@ -164,6 +165,13 @@ func TestGetVolumeDirectorySuccess(t *testing.T) {
 			pv:   builder.ForPersistentVolume("a-pv").CSI("csi.test.com", "provider-volume-id").Result(),
 			want: "a-pv/mount",
 		},
+		{
+			name: "Block CSI volume with a PVC/PV does not append '/mount' to the volume name",
+			pod:  builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").PersistentVolumeClaimSource("my-pvc").Result()).Result(),
+			pvc:  builder.ForPersistentVolumeClaim("ns-1", "my-pvc").VolumeName("a-pv").Result(),
+			pv:   builder.ForPersistentVolume("a-pv").CSI("csi.test.com", "provider-volume-id").VolumeMode(corev1.PersistentVolumeBlock).Result(),
+			want: "a-pv",
+		},
 		{
 			name: "CSI volume mounted without a PVC appends '/mount' to the volume name",
 			pod:  builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").CSISource("csi.test.com").Result()).Result(),
@@ -211,6 +219,54 @@ func TestGetVolumeDirectorySuccess(t *testing.T) {
 	}
 }
 
+// TestGetVolumeModeSuccess tests the GetVolumeMode function
+func TestGetVolumeModeSuccess(t *testing.T) {
+	tests := []struct {
+		name string
+		pod  *corev1.Pod
+		pvc  *corev1.PersistentVolumeClaim
+		pv   *corev1.PersistentVolume
+		want uploader.PersistentVolumeMode
+	}{
+		{
+			name: "Filesystem PVC volume",
+			pod:  builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").PersistentVolumeClaimSource("my-pvc").Result()).Result(),
+			pvc:  builder.ForPersistentVolumeClaim("ns-1", "my-pvc").VolumeName("a-pv").Result(),
+			pv:   builder.ForPersistentVolume("a-pv").VolumeMode(corev1.PersistentVolumeFilesystem).Result(),
+			want: uploader.PersistentVolumeFilesystem,
+		},
+		{
+			name: "Block PVC volume",
+			pod:  builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").PersistentVolumeClaimSource("my-pvc").Result()).Result(),
+			pvc:  builder.ForPersistentVolumeClaim("ns-1", "my-pvc").VolumeName("a-pv").Result(),
+			pv:   builder.ForPersistentVolume("a-pv").VolumeMode(corev1.PersistentVolumeBlock).Result(),
+			want: uploader.PersistentVolumeBlock,
+		},
+		{
+			name: "Pod volume without a PVC",
+			pod:  builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").Result()).Result(),
+			want: uploader.PersistentVolumeFilesystem,
+		},
+	}
+
+	for _, tc := range tests {
+		clientBuilder := fake.NewClientBuilder()
+
+		if tc.pvc != nil {
+			clientBuilder = clientBuilder.WithObjects(tc.pvc)
+		}
+		if tc.pv != nil {
+			clientBuilder = clientBuilder.WithObjects(tc.pv)
+		}
+
+		// Function under test
+		mode, err := GetVolumeMode(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, clientBuilder.Build())
+
+		require.NoError(t, err)
+		assert.Equal(t, tc.want, mode)
+	}
+}
+
 func TestIsV1Beta1CRDReady(t *testing.T) {
 	tests := []struct {
 		name string
diff --git a/site/content/docs/main/csi-snapshot-data-movement.md b/site/content/docs/main/csi-snapshot-data-movement.md
index 667be0a48..b694da9f3 100644
--- a/site/content/docs/main/csi-snapshot-data-movement.md
+++ b/site/content/docs/main/csi-snapshot-data-movement.md
@@ -75,24 +75,10 @@ To mount the correct hostpath to pods volumes, run the node-agent pod in `privil
     oc adm policy add-scc-to-user privileged -z velero -n velero
     ```
 
-2. Modify the DaemonSet yaml to request a privileged mode:
-
-    ```diff
-    @@ -67,3 +67,5 @@ spec:
-                  value: /credentials/cloud
-                - name: VELERO_SCRATCH_DIR
-                  value: /scratch
-    +          securityContext:
-    +            privileged: true
+2. Install Velero with the '--privileged-node-agent' option to request a privileged mode:
+  
     ```
-
-    or
-
-    ```shell
-    oc patch ds/node-agent \
-      --namespace velero \
-      --type json \
-      -p '[{"op":"add","path":"/spec/template/spec/containers/0/securityContext","value": { "privileged": true}}]'
+    velero install --use-node-agent --privileged-node-agent
     ```
 
 If node-agent is not running in a privileged mode, it will not be able to access snapshot volumes within the mounted 
diff --git a/site/content/docs/main/customize-installation.md b/site/content/docs/main/customize-installation.md
index 0d8621685..a4e2299c8 100644
--- a/site/content/docs/main/customize-installation.md
+++ b/site/content/docs/main/customize-installation.md
@@ -23,6 +23,14 @@ By default, `velero install` does not install Velero's [File System Backup][3].
 
 If you've already run `velero install` without the `--use-node-agent` flag, you can run the same command again, including the `--use-node-agent` flag, to add the file system backup to your existing install.
 
+## CSI Snapshot Data Movement
+
+Velero node-agent is required by CSI snapshot data movement when Velero built-in data mover is used. By default, `velero install` does not install Velero's node-agent. To enable it, specify the `--use-node-agent` flag.
+
+For some use cases, Velero node-agent requires to run under privileged mode. For example, when backing up block volumes, it is required to allow the node-agent to access the block device. To enable it set velero install flags `--privileged-node-agent`.
+
+If you've already run `velero install` without the `--use-node-agent` or `--privileged-node-agent` flag, you can run the same command again, including the `--use-node-agent` or `--privileged-node-agent` flag, to add CSI snapshot data movement to your existing install.
+
 ## Default Pod Volume backup to file system backup
 
 By default, `velero install` does not enable the use of File System Backup (FSB) to take backups of all pod volumes. You must apply an [annotation](file-system-backup.md/#using-opt-in-pod-volume-backup) to every pod which contains volumes for Velero to use FSB for the backup.
diff --git a/site/content/docs/main/file-system-backup.md b/site/content/docs/main/file-system-backup.md
index 108daede4..30021806b 100644
--- a/site/content/docs/main/file-system-backup.md
+++ b/site/content/docs/main/file-system-backup.md
@@ -111,24 +111,10 @@ To mount the correct hostpath to pods volumes, run the node-agent pod in `privil
     oc adm policy add-scc-to-user privileged -z velero -n velero
     ```
 
-2. Modify the DaemonSet yaml to request a privileged mode:
-
-    ```diff
-    @@ -67,3 +67,5 @@ spec:
-                  value: /credentials/cloud
-                - name: VELERO_SCRATCH_DIR
-                  value: /scratch
-    +          securityContext:
-    +            privileged: true
+2. Install Velero with the '--privileged-node-agent' option to request a privileged mode:
+  
     ```
-
-    or
-
-    ```shell
-    oc patch ds/node-agent \
-      --namespace velero \
-      --type json \
-      -p '[{"op":"add","path":"/spec/template/spec/containers/0/securityContext","value": { "privileged": true}}]'
+    velero install --use-node-agent --privileged-node-agent
     ```
 
 
diff --git a/tilt-resources/examples/node-agent.yaml b/tilt-resources/examples/node-agent.yaml
index d5c10fc47..835ba297f 100644
--- a/tilt-resources/examples/node-agent.yaml
+++ b/tilt-resources/examples/node-agent.yaml
@@ -49,6 +49,9 @@ spec:
             - mountPath: /host_pods
               mountPropagation: HostToContainer
               name: host-pods
+            - mountPath: /var/lib/kubelet/plugins
+              mountPropagation: HostToContainer
+              name: host-plugins              
             - mountPath: /scratch
               name: scratch
             - mountPath: /credentials
@@ -60,6 +63,9 @@ spec:
         - hostPath:
             path: /var/lib/kubelet/pods
           name: host-pods
+        - hostPath:
+            path: /var/lib/kubelet/plugins
+          name: host-plugins          
         - emptyDir: {}
           name: scratch
         - name: cloud-credentials