Merge branch 'main' into vgdp-ms-pvb-controller

pull/9015/head
Lyndon-Li 2025-06-11 17:29:04 +08:00
commit 5b9904832d
9 changed files with 1448 additions and 0 deletions

View File

@ -0,0 +1 @@
Fix issue #8988, add data path for VGDP ms PVR

View File

@ -77,6 +77,11 @@ spec:
BackupStorageLocation is the name of the backup storage location
where the backup repository is stored.
type: string
cancel:
description: |-
Cancel indicates request to cancel the ongoing PodVolumeRestore. It can be set
when the PodVolumeRestore is in InProgress phase
type: boolean
pod:
description: Pod is a reference to the pod containing the volume to
be restored.

View File

@ -54,6 +54,10 @@ type PodVolumeRestoreSpec struct {
// +optional
// +nullable
UploaderSettings map[string]string `json:"uploaderSettings,omitempty"`
// Cancel indicates request to cancel the ongoing PodVolumeRestore. It can be set
// when the PodVolumeRestore is in InProgress phase
Cancel bool `json:"cancel,omitempty"`
}
// PodVolumeRestorePhase represents the lifecycle phase of a PodVolumeRestore.

View File

@ -97,3 +97,9 @@ func (b *PodVolumeRestoreBuilder) UploaderType(uploaderType string) *PodVolumeRe
b.object.Spec.UploaderType = uploaderType
return b
}
// OwnerReference sets the OwnerReference for this PodVolumeRestore.
func (b *PodVolumeRestoreBuilder) OwnerReference(ownerRef []metav1.OwnerReference) *PodVolumeRestoreBuilder {
b.object.OwnerReferences = ownerRef
return b
}

View File

@ -31,6 +31,7 @@ func NewCommand(f client.Factory) *cobra.Command {
command.AddCommand(
NewBackupCommand(f),
NewRestoreCommand(f),
)
return command

View File

@ -0,0 +1,298 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podvolume
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/bombsimon/logrusr/v3"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
corev1api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/buildinfo"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/podvolume"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
ctlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
)
type podVolumeRestoreConfig struct {
volumePath string
pvrName string
resourceTimeout time.Duration
}
func NewRestoreCommand(f client.Factory) *cobra.Command {
logLevelFlag := logging.LogLevelFlag(logrus.InfoLevel)
formatFlag := logging.NewFormatFlag()
config := podVolumeRestoreConfig{}
command := &cobra.Command{
Use: "restore",
Short: "Run the velero pod volume restore",
Long: "Run the velero pod volume restore",
Hidden: true,
Run: func(c *cobra.Command, args []string) {
logLevel := logLevelFlag.Parse()
logrus.Infof("Setting log-level to %s", strings.ToUpper(logLevel.String()))
logger := logging.DefaultLogger(logLevel, formatFlag.Parse())
logger.Infof("Starting Velero pod volume restore %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA())
f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
s, err := newPodVolumeRestore(logger, f, config)
if err != nil {
kube.ExitPodWithMessage(logger, false, "Failed to create pod volume restore, %v", err)
}
s.run()
},
}
command.Flags().Var(logLevelFlag, "log-level", fmt.Sprintf("The level at which to log. Valid values are %s.", strings.Join(logLevelFlag.AllowedValues(), ", ")))
command.Flags().Var(formatFlag, "log-format", fmt.Sprintf("The format for log output. Valid values are %s.", strings.Join(formatFlag.AllowedValues(), ", ")))
command.Flags().StringVar(&config.volumePath, "volume-path", config.volumePath, "The full path of the volume to be restored")
command.Flags().StringVar(&config.pvrName, "pod-volume-restore", config.pvrName, "The PVR name")
command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters.")
_ = command.MarkFlagRequired("volume-path")
_ = command.MarkFlagRequired("pod-volume-restore")
_ = command.MarkFlagRequired("resource-timeout")
command.PreRunE = func(cmd *cobra.Command, args []string) error {
if config.resourceTimeout <= 0 {
return errors.New("resource-timeout must be greater than 0")
}
if config.volumePath == "" {
return errors.New("volume-path cannot be empty")
}
if config.pvrName == "" {
return errors.New("pod-volume-restore name cannot be empty")
}
return nil
}
return command
}
type podVolumeRestore struct {
logger logrus.FieldLogger
ctx context.Context
cancelFunc context.CancelFunc
client ctlclient.Client
cache ctlcache.Cache
namespace string
nodeName string
config podVolumeRestoreConfig
kubeClient kubernetes.Interface
dataPathMgr *datapath.Manager
}
func newPodVolumeRestore(logger logrus.FieldLogger, factory client.Factory, config podVolumeRestoreConfig) (*podVolumeRestore, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
clientConfig, err := factory.ClientConfig()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client config")
}
ctrl.SetLogger(logrusr.New(logger))
klog.SetLogger(logrusr.New(logger)) // klog.Logger is used by k8s.io/client-go
scheme := runtime.NewScheme()
if err := velerov1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add velero v1 scheme")
}
if err := corev1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add core v1 scheme")
}
nodeName := os.Getenv("NODE_NAME")
// use a field selector to filter to only pods scheduled on this node.
cacheOption := ctlcache.Options{
Scheme: scheme,
ByObject: map[ctlclient.Object]ctlcache.ByObject{
&corev1api.Pod{}: {
Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(),
},
&velerov1api.PodVolumeRestore{}: {
Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(),
},
},
}
cli, err := ctlclient.New(clientConfig, ctlclient.Options{
Scheme: scheme,
})
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client")
}
var cache ctlcache.Cache
retry := 10
for {
cache, err = ctlcache.New(clientConfig, cacheOption)
if err == nil {
break
}
retry--
if retry == 0 {
break
}
logger.WithError(err).Warn("Failed to create client cache, need retry")
time.Sleep(time.Second)
}
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client cache")
}
s := &podVolumeRestore{
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
client: cli,
cache: cache,
config: config,
namespace: factory.Namespace(),
nodeName: nodeName,
}
s.kubeClient, err = factory.KubeClient()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create kube client")
}
s.dataPathMgr = datapath.NewManager(1)
return s, nil
}
var funcCreateDataPathRestore = (*podVolumeRestore).createDataPathService
func (s *podVolumeRestore) run() {
signals.CancelOnShutdown(s.cancelFunc, s.logger)
go func() {
if err := s.cache.Start(s.ctx); err != nil {
s.logger.WithError(err).Warn("error starting cache")
}
}()
s.runDataPath()
}
func (s *podVolumeRestore) runDataPath() {
s.logger.Infof("Starting micro service in node %s for PVR %s", s.nodeName, s.config.pvrName)
dpService, err := funcCreateDataPathRestore(s)
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to create data path service for PVR %s: %v", s.config.pvrName, err)
return
}
s.logger.Infof("Starting data path service %s", s.config.pvrName)
err = dpService.Init()
if err != nil {
dpService.Shutdown()
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to init data path service for PVR %s: %v", s.config.pvrName, err)
return
}
s.logger.Infof("Running data path service %s", s.config.pvrName)
result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
dpService.Shutdown()
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to run data path service for PVR %s: %v", s.config.pvrName, err)
return
}
s.logger.WithField("PVR", s.config.pvrName).Info("Data path service completed")
dpService.Shutdown()
s.logger.WithField("PVR", s.config.pvrName).Info("Data path service is shut down")
s.cancelFunc()
funcExitWithMessage(s.logger, true, result)
}
func (s *podVolumeRestore) createDataPathService() (dataPathService, error) {
credentialFileStore, err := funcNewCredentialFileStore(
s.client,
s.namespace,
credentials.DefaultStoreDirectory(),
filesystem.NewFileSystem(),
)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential file store")
}
credSecretStore, err := funcNewCredentialSecretStore(s.client, s.namespace)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential secret store")
}
credGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
pvrInformer, err := s.cache.GetInformer(s.ctx, &velerov1api.PodVolumeRestore{})
if err != nil {
return nil, errors.Wrap(err, "error to get controller-runtime informer from manager")
}
repoEnsurer := repository.NewEnsurer(s.client, s.logger, s.config.resourceTimeout)
return podvolume.NewRestoreMicroService(s.ctx, s.client, s.kubeClient, s.config.pvrName, s.namespace, s.nodeName, datapath.AccessPoint{
ByPath: s.config.volumePath,
VolMode: uploader.PersistentVolumeFilesystem,
}, s.dataPathMgr, repoEnsurer, credGetter, pvrInformer, s.logger), nil
}

View File

@ -0,0 +1,166 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podvolume
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
cacheMock "github.com/vmware-tanzu/velero/pkg/cmd/cli/datamover/mocks"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)
func fakeCreateRestoreDataPathServiceWithErr(_ *podVolumeRestore) (dataPathService, error) {
return nil, errors.New("fake-create-data-path-error")
}
func fakeCreateRestoreDataPathService(_ *podVolumeRestore) (dataPathService, error) {
return frHelper, nil
}
func TestRunRestoreDataPath(t *testing.T) {
tests := []struct {
name string
pvrName string
createDataPathFail bool
initDataPathErr error
runCancelableDataPathErr error
runCancelableDataPathResult string
expectedMessage string
expectedSucceed bool
}{
{
name: "create data path failed",
pvrName: "fake-name",
createDataPathFail: true,
expectedMessage: "Failed to create data path service for PVR fake-name: fake-create-data-path-error",
},
{
name: "init data path failed",
pvrName: "fake-name",
initDataPathErr: errors.New("fake-init-data-path-error"),
expectedMessage: "Failed to init data path service for PVR fake-name: fake-init-data-path-error",
},
{
name: "run data path failed",
pvrName: "fake-name",
runCancelableDataPathErr: errors.New("fake-run-data-path-error"),
expectedMessage: "Failed to run data path service for PVR fake-name: fake-run-data-path-error",
},
{
name: "succeed",
pvrName: "fake-name",
runCancelableDataPathResult: "fake-run-data-path-result",
expectedMessage: "fake-run-data-path-result",
expectedSucceed: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
frHelper = &fakeRunHelper{
initErr: test.initDataPathErr,
runCancelableDataPathErr: test.runCancelableDataPathErr,
runCancelableDataPathResult: test.runCancelableDataPathResult,
}
if test.createDataPathFail {
funcCreateDataPathRestore = fakeCreateRestoreDataPathServiceWithErr
} else {
funcCreateDataPathRestore = fakeCreateRestoreDataPathService
}
funcExitWithMessage = frHelper.ExitWithMessage
s := &podVolumeRestore{
logger: velerotest.NewLogger(),
cancelFunc: func() {},
config: podVolumeRestoreConfig{
pvrName: test.pvrName,
},
}
s.runDataPath()
assert.Equal(t, test.expectedMessage, frHelper.exitMessage)
assert.Equal(t, test.expectedSucceed, frHelper.succeed)
})
}
}
func TestCreateRestoreDataPathService(t *testing.T) {
tests := []struct {
name string
fileStoreErr error
secretStoreErr error
mockGetInformer bool
getInformerErr error
expectedError string
}{
{
name: "create credential file store error",
fileStoreErr: errors.New("fake-file-store-error"),
expectedError: "error to create credential file store: fake-file-store-error",
},
{
name: "create credential secret store",
secretStoreErr: errors.New("fake-secret-store-error"),
expectedError: "error to create credential secret store: fake-secret-store-error",
},
{
name: "get informer error",
mockGetInformer: true,
getInformerErr: errors.New("fake-get-informer-error"),
expectedError: "error to get controller-runtime informer from manager: fake-get-informer-error",
},
{
name: "succeed",
mockGetInformer: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fcHelper := &fakeCreateDataPathServiceHelper{
fileStoreErr: test.fileStoreErr,
secretStoreErr: test.secretStoreErr,
}
funcNewCredentialFileStore = fcHelper.NewNamespacedFileStore
funcNewCredentialSecretStore = fcHelper.NewNamespacedSecretStore
cache := cacheMock.NewCache(t)
if test.mockGetInformer {
cache.On("GetInformer", mock.Anything, mock.Anything).Return(nil, test.getInformerErr)
}
funcExitWithMessage = frHelper.ExitWithMessage
s := &podVolumeRestore{
cache: cache,
}
_, err := s.createDataPathService()
if test.expectedError != "" {
assert.EqualError(t, err, test.expectedError)
} else {
assert.NoError(t, err)
}
})
}
}

View File

@ -0,0 +1,345 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podvolume
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/kube"
cachetool "k8s.io/client-go/tools/cache"
)
// RestoreMicroService process data mover restores inside the restore pod
type RestoreMicroService struct {
ctx context.Context
client client.Client
kubeClient kubernetes.Interface
repoEnsurer *repository.Ensurer
credentialGetter *credentials.CredentialGetter
logger logrus.FieldLogger
dataPathMgr *datapath.Manager
eventRecorder kube.EventRecorder
namespace string
pvrName string
pvr *velerov1api.PodVolumeRestore
sourceTargetPath datapath.AccessPoint
resultSignal chan dataPathResult
pvrInformer cache.Informer
pvrHandler cachetool.ResourceEventHandlerRegistration
nodeName string
}
func NewRestoreMicroService(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, pvrName string, namespace string, nodeName string,
sourceTargetPath datapath.AccessPoint, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, cred *credentials.CredentialGetter,
pvrInformer cache.Informer, log logrus.FieldLogger) *RestoreMicroService {
return &RestoreMicroService{
ctx: ctx,
client: client,
kubeClient: kubeClient,
credentialGetter: cred,
logger: log,
repoEnsurer: repoEnsurer,
dataPathMgr: dataPathMgr,
namespace: namespace,
pvrName: pvrName,
sourceTargetPath: sourceTargetPath,
nodeName: nodeName,
resultSignal: make(chan dataPathResult),
pvrInformer: pvrInformer,
}
}
func (r *RestoreMicroService) Init() error {
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.pvrName, r.nodeName, r.logger)
handler, err := r.pvrInformer.AddEventHandler(
cachetool.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj any, newObj any) {
oldPvr := oldObj.(*velerov1api.PodVolumeRestore)
newPvr := newObj.(*velerov1api.PodVolumeRestore)
if newPvr.Name != r.pvrName {
return
}
if newPvr.Status.Phase != velerov1api.PodVolumeRestorePhaseInProgress {
return
}
if newPvr.Spec.Cancel && !oldPvr.Spec.Cancel {
r.cancelPodVolumeRestore(newPvr)
}
},
},
)
if err != nil {
return errors.Wrap(err, "error adding PVR handler")
}
r.pvrHandler = handler
return err
}
func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string, error) {
log := r.logger.WithFields(logrus.Fields{
"PVR": r.pvrName,
})
pvr := &velerov1api.PodVolumeRestore{}
err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) {
err := r.client.Get(ctx, types.NamespacedName{
Namespace: r.namespace,
Name: r.pvrName,
}, pvr)
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return true, errors.Wrapf(err, "error to get PVR %s", r.pvrName)
}
if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress {
return true, nil
} else {
return false, nil
}
})
if err != nil {
log.WithError(err).Error("Failed to wait PVR")
return "", errors.Wrap(err, "error waiting for PVR")
}
r.pvr = pvr
log.Info("Run cancelable PVR")
callbacks := datapath.Callbacks{
OnCompleted: r.OnPvrCompleted,
OnFailed: r.OnPvrFailed,
OnCancelled: r.OnPvrCancelled,
OnProgress: r.OnPvrProgress,
}
fsRestore, err := r.dataPathMgr.CreateFileSystemBR(pvr.Name, podVolumeRequestor, ctx, r.client, pvr.Namespace, callbacks, log)
if err != nil {
return "", errors.Wrap(err, "error to create data path")
}
log.Debug("Async fs br created")
if err := fsRestore.Init(ctx,
&datapath.FSBRInitParam{
BSLName: pvr.Spec.BackupStorageLocation,
SourceNamespace: pvr.Spec.SourceNamespace,
UploaderType: pvr.Spec.UploaderType,
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repoEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return "", errors.Wrap(err, "error to initialize data path")
}
log.Info("Async fs br init")
if err := fsRestore.StartRestore(pvr.Spec.SnapshotID, r.sourceTargetPath, pvr.Spec.UploaderSettings); err != nil {
return "", errors.Wrap(err, "error starting data path restore")
}
log.Info("Async fs restore data path started")
r.eventRecorder.Event(pvr, false, datapath.EventReasonStarted, "Data path for %s started", pvr.Name)
result := ""
select {
case <-ctx.Done():
err = errors.New("timed out waiting for fs restore to complete")
break
case res := <-r.resultSignal:
err = res.err
result = res.result
break
}
if err != nil {
log.WithError(err).Error("Async fs restore was not completed")
}
r.eventRecorder.EndingEvent(pvr, false, datapath.EventReasonStopped, "Data path for %s stopped", pvr.Name)
return result, err
}
func (r *RestoreMicroService) Shutdown() {
r.eventRecorder.Shutdown()
r.closeDataPath(r.ctx, r.pvrName)
if r.pvrHandler != nil {
if err := r.pvrInformer.RemoveEventHandler(r.pvrHandler); err != nil {
r.logger.WithError(err).Warn("Failed to remove pod handler")
}
}
}
var funcWriteCompletionMark = writeCompletionMark
func (r *RestoreMicroService) OnPvrCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) {
log := r.logger.WithField("PVR", pvrName)
err := funcWriteCompletionMark(r.pvr, result.Restore, log)
if err != nil {
log.WithError(err).Warnf("Failed to write completion mark, restored pod may failed to start")
}
restoreBytes, err := funcMarshal(result.Restore)
if err != nil {
log.WithError(err).Errorf("Failed to marshal restore result %v", result.Restore)
r.recordPvrFailed(fmt.Sprintf("error marshaling restore result %v", result.Restore), err)
} else {
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonCompleted, string(restoreBytes))
r.resultSignal <- dataPathResult{
result: string(restoreBytes),
}
}
log.Info("Async fs restore data path completed")
}
func (r *RestoreMicroService) recordPvrFailed(msg string, err error) {
evtMsg := fmt.Sprintf("%s, error %v", msg, err)
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonFailed, evtMsg)
r.resultSignal <- dataPathResult{
err: errors.Wrapf(err, msg),
}
}
func (r *RestoreMicroService) OnPvrFailed(ctx context.Context, namespace string, pvrName string, err error) {
log := r.logger.WithField("PVR", pvrName)
log.WithError(err).Error("Async fs restore data path failed")
r.recordPvrFailed(fmt.Sprintf("Data path for PVR %s failed", pvrName), err)
}
func (r *RestoreMicroService) OnPvrCancelled(ctx context.Context, namespace string, pvrName string) {
log := r.logger.WithField("PVR", pvrName)
log.Warn("Async fs restore data path canceled")
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonCancelled, "Data path for PVR %s canceled", pvrName)
r.resultSignal <- dataPathResult{
err: errors.New(datapath.ErrCancelled),
}
}
func (r *RestoreMicroService) OnPvrProgress(ctx context.Context, namespace string, pvrName string, progress *uploader.Progress) {
log := r.logger.WithFields(logrus.Fields{
"PVR": pvrName,
})
progressBytes, err := funcMarshal(progress)
if err != nil {
log.WithError(err).Errorf("Failed to marshal progress %v", progress)
return
}
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonProgress, string(progressBytes))
}
func (r *RestoreMicroService) closeDataPath(ctx context.Context, pvrName string) {
fsRestore := r.dataPathMgr.GetAsyncBR(pvrName)
if fsRestore != nil {
fsRestore.Close(ctx)
}
r.dataPathMgr.RemoveAsyncBR(pvrName)
}
func (r *RestoreMicroService) cancelPodVolumeRestore(pvr *velerov1api.PodVolumeRestore) {
r.logger.WithField("PVR", pvr.Name).Info("PVR is being canceled")
r.eventRecorder.Event(pvr, false, datapath.EventReasonCancelling, "Canceling for PVR %s", pvr.Name)
fsBackup := r.dataPathMgr.GetAsyncBR(pvr.Name)
if fsBackup == nil {
r.OnPvrCancelled(r.ctx, pvr.GetNamespace(), pvr.GetName())
} else {
fsBackup.Cancel()
}
}
var funcRemoveAll = os.RemoveAll
var funcMkdirAll = os.MkdirAll
var funcWriteFile = os.WriteFile
func writeCompletionMark(pvr *velerov1api.PodVolumeRestore, result datapath.RestoreResult, log logrus.FieldLogger) error {
volumePath := result.Target.ByPath
if volumePath == "" {
return errors.New("target volume is empty in restore result")
}
// Remove the .velero directory from the restored volume (it may contain done files from previous restores
// of this volume, which we don't want to carry over). If this fails for any reason, log and continue, since
// this is non-essential cleanup (the done files are named based on restore UID and the init container looks
// for the one specific to the restore being executed).
if err := funcRemoveAll(filepath.Join(volumePath, ".velero")); err != nil {
log.WithError(err).Warnf("Failed to remove .velero directory from directory %s", volumePath)
}
if len(pvr.OwnerReferences) == 0 {
return errors.New("error finding restore UID")
}
restoreUID := pvr.OwnerReferences[0].UID
// Create the .velero directory within the volume dir so we can write a done file
// for this restore.
if err := funcMkdirAll(filepath.Join(volumePath, ".velero"), 0755); err != nil {
return errors.Wrapf(err, "error creating .velero directory for done file")
}
// Write a done file with name=<restore-uid> into the just-created .velero dir
// within the volume. The velero init container on the pod is waiting
// for this file to exist in each restored volume before completing.
if err := funcWriteFile(filepath.Join(volumePath, ".velero", string(restoreUID)), nil, 0644); err != nil {
return errors.Wrapf(err, "error writing done file")
}
return nil
}

View File

@ -0,0 +1,622 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podvolume
import (
"context"
"fmt"
"os"
"sync"
"testing"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/runtime"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/uploader"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type restoreMsTestHelper struct {
eventReason string
eventMsg string
marshalErr error
marshalBytes []byte
withEvent bool
eventLock sync.Mutex
writeCompletionErr error
}
func (rt *restoreMsTestHelper) Event(_ runtime.Object, _ bool, reason string, message string, a ...any) {
rt.eventLock.Lock()
defer rt.eventLock.Unlock()
rt.withEvent = true
rt.eventReason = reason
rt.eventMsg = fmt.Sprintf(message, a...)
}
func (rt *restoreMsTestHelper) EndingEvent(_ runtime.Object, _ bool, reason string, message string, a ...any) {
rt.eventLock.Lock()
defer rt.eventLock.Unlock()
rt.withEvent = true
rt.eventReason = reason
rt.eventMsg = fmt.Sprintf(message, a...)
}
func (rt *restoreMsTestHelper) Shutdown() {}
func (rt *restoreMsTestHelper) Marshal(v any) ([]byte, error) {
if rt.marshalErr != nil {
return nil, rt.marshalErr
}
return rt.marshalBytes, nil
}
func (rt *restoreMsTestHelper) EventReason() string {
rt.eventLock.Lock()
defer rt.eventLock.Unlock()
return rt.eventReason
}
func (rt *restoreMsTestHelper) EventMessage() string {
rt.eventLock.Lock()
defer rt.eventLock.Unlock()
return rt.eventMsg
}
func (rt *restoreMsTestHelper) WriteCompletionMark(*velerov1api.PodVolumeRestore, datapath.RestoreResult, logrus.FieldLogger) error {
return rt.writeCompletionErr
}
func TestOnPvrFailed(t *testing.T) {
pvrName := "fake-pvr"
rt := &restoreMsTestHelper{}
rs := &RestoreMicroService{
pvrName: pvrName,
dataPathMgr: datapath.NewManager(1),
eventRecorder: rt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
expectedErr := "Data path for PVR fake-pvr failed: fake-error"
expectedEventReason := datapath.EventReasonFailed
expectedEventMsg := "Data path for PVR fake-pvr failed, error fake-error"
go rs.OnPvrFailed(context.TODO(), velerov1api.DefaultNamespace, pvrName, errors.New("fake-error"))
result := <-rs.resultSignal
assert.EqualError(t, result.err, expectedErr)
assert.Equal(t, expectedEventReason, rt.EventReason())
assert.Equal(t, expectedEventMsg, rt.EventMessage())
}
func TestPvrCancelled(t *testing.T) {
pvrName := "fake-pvr"
rt := &restoreMsTestHelper{}
rs := RestoreMicroService{
pvrName: pvrName,
dataPathMgr: datapath.NewManager(1),
eventRecorder: rt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
expectedErr := datapath.ErrCancelled
expectedEventReason := datapath.EventReasonCancelled
expectedEventMsg := "Data path for PVR fake-pvr canceled"
go rs.OnPvrCancelled(context.TODO(), velerov1api.DefaultNamespace, pvrName)
result := <-rs.resultSignal
assert.EqualError(t, result.err, expectedErr)
assert.Equal(t, expectedEventReason, rt.EventReason())
assert.Equal(t, expectedEventMsg, rt.EventMessage())
}
func TestOnPvrCompleted(t *testing.T) {
tests := []struct {
name string
expectedErr string
expectedEventReason string
expectedEventMsg string
marshalErr error
marshallStr string
writeCompletionErr error
expectedLog string
}{
{
name: "marshal fail",
marshalErr: errors.New("fake-marshal-error"),
expectedErr: "error marshaling restore result {{ } 0}: fake-marshal-error",
},
{
name: "succeed",
marshallStr: "fake-complete-string",
expectedEventReason: datapath.EventReasonCompleted,
expectedEventMsg: "fake-complete-string",
},
{
name: "succeed but write completion mark fail",
marshallStr: "fake-complete-string",
writeCompletionErr: errors.New("fake-write-completion-error"),
expectedEventReason: datapath.EventReasonCompleted,
expectedEventMsg: "fake-complete-string",
expectedLog: "Failed to write completion mark, restored pod may failed to start",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pvrName := "fake-pvr"
rt := &restoreMsTestHelper{
marshalErr: test.marshalErr,
marshalBytes: []byte(test.marshallStr),
writeCompletionErr: test.writeCompletionErr,
}
logBuffer := []string{}
rs := &RestoreMicroService{
dataPathMgr: datapath.NewManager(1),
eventRecorder: rt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewMultipleLogger(&logBuffer),
}
funcMarshal = rt.Marshal
funcWriteCompletionMark = rt.WriteCompletionMark
go rs.OnPvrCompleted(context.TODO(), velerov1api.DefaultNamespace, pvrName, datapath.Result{})
result := <-rs.resultSignal
if test.marshalErr != nil {
assert.EqualError(t, result.err, test.expectedErr)
} else {
assert.NoError(t, result.err)
assert.Equal(t, test.expectedEventReason, rt.EventReason())
assert.Equal(t, test.expectedEventMsg, rt.EventMessage())
if test.expectedLog != "" {
assert.Contains(t, logBuffer[0], test.expectedLog)
}
}
})
}
}
func TestOnPvrProgress(t *testing.T) {
tests := []struct {
name string
expectedErr string
expectedEventReason string
expectedEventMsg string
marshalErr error
marshallStr string
}{
{
name: "marshal fail",
marshalErr: errors.New("fake-marshal-error"),
expectedErr: "Failed to marshal restore result",
},
{
name: "succeed",
marshallStr: "fake-progress-string",
expectedEventReason: datapath.EventReasonProgress,
expectedEventMsg: "fake-progress-string",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pvrName := "fake-pvr"
rt := &restoreMsTestHelper{
marshalErr: test.marshalErr,
marshalBytes: []byte(test.marshallStr),
}
rs := &RestoreMicroService{
dataPathMgr: datapath.NewManager(1),
eventRecorder: rt,
logger: velerotest.NewLogger(),
}
funcMarshal = rt.Marshal
rs.OnPvrProgress(context.TODO(), velerov1api.DefaultNamespace, pvrName, &uploader.Progress{})
if test.marshalErr != nil {
assert.False(t, rt.withEvent)
} else {
assert.True(t, rt.withEvent)
assert.Equal(t, test.expectedEventReason, rt.EventReason())
assert.Equal(t, test.expectedEventMsg, rt.EventMessage())
}
})
}
}
func TestCancelPodVolumeRestore(t *testing.T) {
tests := []struct {
name string
expectedEventReason string
expectedEventMsg string
expectedErr string
}{
{
name: "no fs restore",
expectedEventReason: datapath.EventReasonCancelled,
expectedEventMsg: "Data path for PVR fake-pvr canceled",
expectedErr: datapath.ErrCancelled,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pvrName := "fake-pvr"
pvr := builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Result()
rt := &restoreMsTestHelper{}
rs := &RestoreMicroService{
dataPathMgr: datapath.NewManager(1),
eventRecorder: rt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
go rs.cancelPodVolumeRestore(pvr)
result := <-rs.resultSignal
assert.EqualError(t, result.err, test.expectedErr)
assert.True(t, rt.withEvent)
assert.Equal(t, test.expectedEventReason, rt.EventReason())
assert.Equal(t, test.expectedEventMsg, rt.EventMessage())
})
}
}
func TestRunCancelableDataPathRestore(t *testing.T) {
pvrName := "fake-pvr"
pvr := builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseNew).Result()
pvrInProgress := builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result()
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second)
tests := []struct {
name string
ctx context.Context
result *dataPathResult
dataPathMgr *datapath.Manager
kubeClientObj []runtime.Object
initErr error
startErr error
dataPathStarted bool
expectedEventMsg string
expectedErr string
}{
{
name: "no pvr",
ctx: ctxTimeout,
expectedErr: "error waiting for PVR: context deadline exceeded",
},
{
name: "pvr not in in-progress",
ctx: ctxTimeout,
kubeClientObj: []runtime.Object{pvr},
expectedErr: "error waiting for PVR: context deadline exceeded",
},
{
name: "create data path fail",
ctx: context.Background(),
kubeClientObj: []runtime.Object{pvrInProgress},
dataPathMgr: datapath.NewManager(0),
expectedErr: "error to create data path: Concurrent number exceeds",
},
{
name: "init data path fail",
ctx: context.Background(),
kubeClientObj: []runtime.Object{pvrInProgress},
initErr: errors.New("fake-init-error"),
expectedErr: "error to initialize data path: fake-init-error",
},
{
name: "start data path fail",
ctx: context.Background(),
kubeClientObj: []runtime.Object{pvrInProgress},
startErr: errors.New("fake-start-error"),
expectedErr: "error starting data path restore: fake-start-error",
},
{
name: "data path timeout",
ctx: ctxTimeout,
kubeClientObj: []runtime.Object{pvrInProgress},
dataPathStarted: true,
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvrName),
expectedErr: "timed out waiting for fs restore to complete",
},
{
name: "data path returns error",
ctx: context.Background(),
kubeClientObj: []runtime.Object{pvrInProgress},
dataPathStarted: true,
result: &dataPathResult{
err: errors.New("fake-data-path-error"),
},
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvrName),
expectedErr: "fake-data-path-error",
},
{
name: "succeed",
ctx: context.Background(),
kubeClientObj: []runtime.Object{pvrInProgress},
dataPathStarted: true,
result: &dataPathResult{
result: "fake-succeed-result",
},
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvrName),
},
}
scheme := runtime.NewScheme()
velerov1api.AddToScheme(scheme)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeClientBuilder := clientFake.NewClientBuilder()
fakeClientBuilder = fakeClientBuilder.WithScheme(scheme)
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
rt := &restoreMsTestHelper{}
rs := &RestoreMicroService{
namespace: velerov1api.DefaultNamespace,
pvrName: pvrName,
ctx: context.Background(),
client: fakeClient,
dataPathMgr: datapath.NewManager(1),
eventRecorder: rt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
if test.ctx != nil {
rs.ctx = test.ctx
}
if test.dataPathMgr != nil {
rs.dataPathMgr = test.dataPathMgr
}
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
fsBR := datapathmockes.NewAsyncBR(t)
if test.initErr != nil {
fsBR.On("Init", mock.Anything, mock.Anything).Return(test.initErr)
}
if test.startErr != nil {
fsBR.On("Init", mock.Anything, mock.Anything).Return(nil)
fsBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.startErr)
}
if test.dataPathStarted {
fsBR.On("Init", mock.Anything, mock.Anything).Return(nil)
fsBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(nil)
}
return fsBR
}
if test.result != nil {
go func() {
time.Sleep(time.Millisecond * 500)
rs.resultSignal <- *test.result
}()
}
result, err := rs.RunCancelableDataPath(test.ctx)
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)
} else {
assert.NoError(t, err)
assert.Equal(t, test.result.result, result)
}
if test.expectedEventMsg != "" {
assert.True(t, rt.withEvent)
assert.Equal(t, test.expectedEventMsg, rt.EventMessage())
}
})
}
cancel()
}
func TestWriteCompletionMark(t *testing.T) {
tests := []struct {
name string
pvr *velerov1api.PodVolumeRestore
result datapath.RestoreResult
funcRemoveAll func(string) error
funcMkdirAll func(string, os.FileMode) error
funcWriteFile func(string, []byte, os.FileMode) error
expectedErr string
expectedLog string
}{
{
name: "no volume path",
result: datapath.RestoreResult{},
expectedErr: "target volume is empty in restore result",
},
{
name: "no owner reference",
result: datapath.RestoreResult{
Target: datapath.AccessPoint{
ByPath: "fake-volume-path",
},
},
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, "fake-pvr").Result(),
funcRemoveAll: func(string) error {
return nil
},
expectedErr: "error finding restore UID",
},
{
name: "mkdir fail",
result: datapath.RestoreResult{
Target: datapath.AccessPoint{
ByPath: "fake-volume-path",
},
},
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, "fake-pvr").OwnerReference([]metav1.OwnerReference{
{
UID: "fake-uid",
},
}).Result(),
funcRemoveAll: func(string) error {
return nil
},
funcMkdirAll: func(string, os.FileMode) error {
return errors.New("fake-mk-dir-error")
},
expectedErr: "error creating .velero directory for done file: fake-mk-dir-error",
},
{
name: "write file fail",
result: datapath.RestoreResult{
Target: datapath.AccessPoint{
ByPath: "fake-volume-path",
},
},
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, "fake-pvr").OwnerReference([]metav1.OwnerReference{
{
UID: "fake-uid",
},
}).Result(),
funcRemoveAll: func(string) error {
return nil
},
funcMkdirAll: func(string, os.FileMode) error {
return nil
},
funcWriteFile: func(string, []byte, os.FileMode) error {
return errors.New("fake-write-file-error")
},
expectedErr: "error writing done file: fake-write-file-error",
},
{
name: "succeed",
result: datapath.RestoreResult{
Target: datapath.AccessPoint{
ByPath: "fake-volume-path",
},
},
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, "fake-pvr").OwnerReference([]metav1.OwnerReference{
{
UID: "fake-uid",
},
}).Result(),
funcRemoveAll: func(string) error {
return nil
},
funcMkdirAll: func(string, os.FileMode) error {
return nil
},
funcWriteFile: func(string, []byte, os.FileMode) error {
return nil
},
},
{
name: "succeed but previous dir is not removed",
result: datapath.RestoreResult{
Target: datapath.AccessPoint{
ByPath: "fake-volume-path",
},
},
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, "fake-pvr").OwnerReference([]metav1.OwnerReference{
{
UID: "fake-uid",
},
}).Result(),
funcRemoveAll: func(string) error {
return errors.New("fake-remove-dir-error")
},
funcMkdirAll: func(string, os.FileMode) error {
return nil
},
funcWriteFile: func(string, []byte, os.FileMode) error {
return nil
},
expectedLog: "Failed to remove .velero directory from directory fake-volume-path",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.funcRemoveAll != nil {
funcRemoveAll = test.funcRemoveAll
}
if test.funcMkdirAll != nil {
funcMkdirAll = test.funcMkdirAll
}
if test.funcWriteFile != nil {
funcWriteFile = test.funcWriteFile
}
logBuffer := ""
err := writeCompletionMark(test.pvr, test.result, velerotest.NewSingleLogger(&logBuffer))
if test.expectedErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, test.expectedErr)
}
if test.expectedLog != "" {
assert.Contains(t, logBuffer, test.expectedLog)
}
})
}
}