Merge pull request #9005 from Lyndon-Li/vgdp-ms-pvr-data-path
Data path for VGDP ms PVRpull/9014/head
commit
fbaf21ee41
|
@ -0,0 +1 @@
|
||||||
|
Fix issue #8988, add data path for VGDP ms PVR
|
|
@ -77,6 +77,11 @@ spec:
|
||||||
BackupStorageLocation is the name of the backup storage location
|
BackupStorageLocation is the name of the backup storage location
|
||||||
where the backup repository is stored.
|
where the backup repository is stored.
|
||||||
type: string
|
type: string
|
||||||
|
cancel:
|
||||||
|
description: |-
|
||||||
|
Cancel indicates request to cancel the ongoing PodVolumeRestore. It can be set
|
||||||
|
when the PodVolumeRestore is in InProgress phase
|
||||||
|
type: boolean
|
||||||
pod:
|
pod:
|
||||||
description: Pod is a reference to the pod containing the volume to
|
description: Pod is a reference to the pod containing the volume to
|
||||||
be restored.
|
be restored.
|
||||||
|
|
File diff suppressed because one or more lines are too long
|
@ -54,6 +54,10 @@ type PodVolumeRestoreSpec struct {
|
||||||
// +optional
|
// +optional
|
||||||
// +nullable
|
// +nullable
|
||||||
UploaderSettings map[string]string `json:"uploaderSettings,omitempty"`
|
UploaderSettings map[string]string `json:"uploaderSettings,omitempty"`
|
||||||
|
|
||||||
|
// Cancel indicates request to cancel the ongoing PodVolumeRestore. It can be set
|
||||||
|
// when the PodVolumeRestore is in InProgress phase
|
||||||
|
Cancel bool `json:"cancel,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodVolumeRestorePhase represents the lifecycle phase of a PodVolumeRestore.
|
// PodVolumeRestorePhase represents the lifecycle phase of a PodVolumeRestore.
|
||||||
|
|
|
@ -97,3 +97,9 @@ func (b *PodVolumeRestoreBuilder) UploaderType(uploaderType string) *PodVolumeRe
|
||||||
b.object.Spec.UploaderType = uploaderType
|
b.object.Spec.UploaderType = uploaderType
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OwnerReference sets the OwnerReference for this PodVolumeRestore.
|
||||||
|
func (b *PodVolumeRestoreBuilder) OwnerReference(ownerRef []metav1.OwnerReference) *PodVolumeRestoreBuilder {
|
||||||
|
b.object.OwnerReferences = ownerRef
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ func NewCommand(f client.Factory) *cobra.Command {
|
||||||
|
|
||||||
command.AddCommand(
|
command.AddCommand(
|
||||||
NewBackupCommand(f),
|
NewBackupCommand(f),
|
||||||
|
NewRestoreCommand(f),
|
||||||
)
|
)
|
||||||
|
|
||||||
return command
|
return command
|
||||||
|
|
|
@ -0,0 +1,298 @@
|
||||||
|
/*
|
||||||
|
Copyright The Velero Contributors.
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package podvolume
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/bombsimon/logrusr/v3"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
corev1api "k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
ctrl "sigs.k8s.io/controller-runtime"
|
||||||
|
|
||||||
|
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||||
|
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/buildinfo"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/client"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/podvolume"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/util/logging"
|
||||||
|
|
||||||
|
ctlcache "sigs.k8s.io/controller-runtime/pkg/cache"
|
||||||
|
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
)
|
||||||
|
|
||||||
|
type podVolumeRestoreConfig struct {
|
||||||
|
volumePath string
|
||||||
|
pvrName string
|
||||||
|
resourceTimeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRestoreCommand(f client.Factory) *cobra.Command {
|
||||||
|
logLevelFlag := logging.LogLevelFlag(logrus.InfoLevel)
|
||||||
|
formatFlag := logging.NewFormatFlag()
|
||||||
|
|
||||||
|
config := podVolumeRestoreConfig{}
|
||||||
|
|
||||||
|
command := &cobra.Command{
|
||||||
|
Use: "restore",
|
||||||
|
Short: "Run the velero pod volume restore",
|
||||||
|
Long: "Run the velero pod volume restore",
|
||||||
|
Hidden: true,
|
||||||
|
Run: func(c *cobra.Command, args []string) {
|
||||||
|
logLevel := logLevelFlag.Parse()
|
||||||
|
logrus.Infof("Setting log-level to %s", strings.ToUpper(logLevel.String()))
|
||||||
|
|
||||||
|
logger := logging.DefaultLogger(logLevel, formatFlag.Parse())
|
||||||
|
logger.Infof("Starting Velero pod volume restore %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA())
|
||||||
|
|
||||||
|
f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
|
||||||
|
s, err := newPodVolumeRestore(logger, f, config)
|
||||||
|
if err != nil {
|
||||||
|
kube.ExitPodWithMessage(logger, false, "Failed to create pod volume restore, %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.run()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
command.Flags().Var(logLevelFlag, "log-level", fmt.Sprintf("The level at which to log. Valid values are %s.", strings.Join(logLevelFlag.AllowedValues(), ", ")))
|
||||||
|
command.Flags().Var(formatFlag, "log-format", fmt.Sprintf("The format for log output. Valid values are %s.", strings.Join(formatFlag.AllowedValues(), ", ")))
|
||||||
|
command.Flags().StringVar(&config.volumePath, "volume-path", config.volumePath, "The full path of the volume to be restored")
|
||||||
|
command.Flags().StringVar(&config.pvrName, "pod-volume-restore", config.pvrName, "The PVR name")
|
||||||
|
command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters.")
|
||||||
|
|
||||||
|
_ = command.MarkFlagRequired("volume-path")
|
||||||
|
_ = command.MarkFlagRequired("pod-volume-restore")
|
||||||
|
_ = command.MarkFlagRequired("resource-timeout")
|
||||||
|
|
||||||
|
command.PreRunE = func(cmd *cobra.Command, args []string) error {
|
||||||
|
if config.resourceTimeout <= 0 {
|
||||||
|
return errors.New("resource-timeout must be greater than 0")
|
||||||
|
}
|
||||||
|
if config.volumePath == "" {
|
||||||
|
return errors.New("volume-path cannot be empty")
|
||||||
|
}
|
||||||
|
if config.pvrName == "" {
|
||||||
|
return errors.New("pod-volume-restore name cannot be empty")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return command
|
||||||
|
}
|
||||||
|
|
||||||
|
type podVolumeRestore struct {
|
||||||
|
logger logrus.FieldLogger
|
||||||
|
ctx context.Context
|
||||||
|
cancelFunc context.CancelFunc
|
||||||
|
client ctlclient.Client
|
||||||
|
cache ctlcache.Cache
|
||||||
|
namespace string
|
||||||
|
nodeName string
|
||||||
|
config podVolumeRestoreConfig
|
||||||
|
kubeClient kubernetes.Interface
|
||||||
|
dataPathMgr *datapath.Manager
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPodVolumeRestore(logger logrus.FieldLogger, factory client.Factory, config podVolumeRestoreConfig) (*podVolumeRestore, error) {
|
||||||
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
clientConfig, err := factory.ClientConfig()
|
||||||
|
if err != nil {
|
||||||
|
cancelFunc()
|
||||||
|
return nil, errors.Wrap(err, "error to create client config")
|
||||||
|
}
|
||||||
|
|
||||||
|
ctrl.SetLogger(logrusr.New(logger))
|
||||||
|
klog.SetLogger(logrusr.New(logger)) // klog.Logger is used by k8s.io/client-go
|
||||||
|
|
||||||
|
scheme := runtime.NewScheme()
|
||||||
|
if err := velerov1api.AddToScheme(scheme); err != nil {
|
||||||
|
cancelFunc()
|
||||||
|
return nil, errors.Wrap(err, "error to add velero v1 scheme")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := corev1api.AddToScheme(scheme); err != nil {
|
||||||
|
cancelFunc()
|
||||||
|
return nil, errors.Wrap(err, "error to add core v1 scheme")
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeName := os.Getenv("NODE_NAME")
|
||||||
|
|
||||||
|
// use a field selector to filter to only pods scheduled on this node.
|
||||||
|
cacheOption := ctlcache.Options{
|
||||||
|
Scheme: scheme,
|
||||||
|
ByObject: map[ctlclient.Object]ctlcache.ByObject{
|
||||||
|
&corev1api.Pod{}: {
|
||||||
|
Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(),
|
||||||
|
},
|
||||||
|
&velerov1api.PodVolumeRestore{}: {
|
||||||
|
Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cli, err := ctlclient.New(clientConfig, ctlclient.Options{
|
||||||
|
Scheme: scheme,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
cancelFunc()
|
||||||
|
return nil, errors.Wrap(err, "error to create client")
|
||||||
|
}
|
||||||
|
|
||||||
|
var cache ctlcache.Cache
|
||||||
|
retry := 10
|
||||||
|
for {
|
||||||
|
cache, err = ctlcache.New(clientConfig, cacheOption)
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
retry--
|
||||||
|
if retry == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.WithError(err).Warn("Failed to create client cache, need retry")
|
||||||
|
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
cancelFunc()
|
||||||
|
return nil, errors.Wrap(err, "error to create client cache")
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &podVolumeRestore{
|
||||||
|
logger: logger,
|
||||||
|
ctx: ctx,
|
||||||
|
cancelFunc: cancelFunc,
|
||||||
|
client: cli,
|
||||||
|
cache: cache,
|
||||||
|
config: config,
|
||||||
|
namespace: factory.Namespace(),
|
||||||
|
nodeName: nodeName,
|
||||||
|
}
|
||||||
|
|
||||||
|
s.kubeClient, err = factory.KubeClient()
|
||||||
|
if err != nil {
|
||||||
|
cancelFunc()
|
||||||
|
return nil, errors.Wrap(err, "error to create kube client")
|
||||||
|
}
|
||||||
|
|
||||||
|
s.dataPathMgr = datapath.NewManager(1)
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var funcCreateDataPathRestore = (*podVolumeRestore).createDataPathService
|
||||||
|
|
||||||
|
func (s *podVolumeRestore) run() {
|
||||||
|
signals.CancelOnShutdown(s.cancelFunc, s.logger)
|
||||||
|
go func() {
|
||||||
|
if err := s.cache.Start(s.ctx); err != nil {
|
||||||
|
s.logger.WithError(err).Warn("error starting cache")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
s.runDataPath()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *podVolumeRestore) runDataPath() {
|
||||||
|
s.logger.Infof("Starting micro service in node %s for PVR %s", s.nodeName, s.config.pvrName)
|
||||||
|
|
||||||
|
dpService, err := funcCreateDataPathRestore(s)
|
||||||
|
if err != nil {
|
||||||
|
s.cancelFunc()
|
||||||
|
funcExitWithMessage(s.logger, false, "Failed to create data path service for PVR %s: %v", s.config.pvrName, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.Infof("Starting data path service %s", s.config.pvrName)
|
||||||
|
|
||||||
|
err = dpService.Init()
|
||||||
|
if err != nil {
|
||||||
|
dpService.Shutdown()
|
||||||
|
s.cancelFunc()
|
||||||
|
funcExitWithMessage(s.logger, false, "Failed to init data path service for PVR %s: %v", s.config.pvrName, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.Infof("Running data path service %s", s.config.pvrName)
|
||||||
|
|
||||||
|
result, err := dpService.RunCancelableDataPath(s.ctx)
|
||||||
|
if err != nil {
|
||||||
|
dpService.Shutdown()
|
||||||
|
s.cancelFunc()
|
||||||
|
funcExitWithMessage(s.logger, false, "Failed to run data path service for PVR %s: %v", s.config.pvrName, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.WithField("PVR", s.config.pvrName).Info("Data path service completed")
|
||||||
|
|
||||||
|
dpService.Shutdown()
|
||||||
|
|
||||||
|
s.logger.WithField("PVR", s.config.pvrName).Info("Data path service is shut down")
|
||||||
|
|
||||||
|
s.cancelFunc()
|
||||||
|
|
||||||
|
funcExitWithMessage(s.logger, true, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *podVolumeRestore) createDataPathService() (dataPathService, error) {
|
||||||
|
credentialFileStore, err := funcNewCredentialFileStore(
|
||||||
|
s.client,
|
||||||
|
s.namespace,
|
||||||
|
credentials.DefaultStoreDirectory(),
|
||||||
|
filesystem.NewFileSystem(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "error to create credential file store")
|
||||||
|
}
|
||||||
|
|
||||||
|
credSecretStore, err := funcNewCredentialSecretStore(s.client, s.namespace)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "error to create credential secret store")
|
||||||
|
}
|
||||||
|
|
||||||
|
credGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
|
||||||
|
|
||||||
|
pvrInformer, err := s.cache.GetInformer(s.ctx, &velerov1api.PodVolumeRestore{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "error to get controller-runtime informer from manager")
|
||||||
|
}
|
||||||
|
|
||||||
|
repoEnsurer := repository.NewEnsurer(s.client, s.logger, s.config.resourceTimeout)
|
||||||
|
|
||||||
|
return podvolume.NewRestoreMicroService(s.ctx, s.client, s.kubeClient, s.config.pvrName, s.namespace, s.nodeName, datapath.AccessPoint{
|
||||||
|
ByPath: s.config.volumePath,
|
||||||
|
VolMode: uploader.PersistentVolumeFilesystem,
|
||||||
|
}, s.dataPathMgr, repoEnsurer, credGetter, pvrInformer, s.logger), nil
|
||||||
|
}
|
|
@ -0,0 +1,166 @@
|
||||||
|
/*
|
||||||
|
Copyright The Velero Contributors.
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package podvolume
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
|
||||||
|
cacheMock "github.com/vmware-tanzu/velero/pkg/cmd/cli/datamover/mocks"
|
||||||
|
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||||
|
)
|
||||||
|
|
||||||
|
func fakeCreateRestoreDataPathServiceWithErr(_ *podVolumeRestore) (dataPathService, error) {
|
||||||
|
return nil, errors.New("fake-create-data-path-error")
|
||||||
|
}
|
||||||
|
|
||||||
|
func fakeCreateRestoreDataPathService(_ *podVolumeRestore) (dataPathService, error) {
|
||||||
|
return frHelper, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunRestoreDataPath(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
pvrName string
|
||||||
|
createDataPathFail bool
|
||||||
|
initDataPathErr error
|
||||||
|
runCancelableDataPathErr error
|
||||||
|
runCancelableDataPathResult string
|
||||||
|
expectedMessage string
|
||||||
|
expectedSucceed bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "create data path failed",
|
||||||
|
pvrName: "fake-name",
|
||||||
|
createDataPathFail: true,
|
||||||
|
expectedMessage: "Failed to create data path service for PVR fake-name: fake-create-data-path-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "init data path failed",
|
||||||
|
pvrName: "fake-name",
|
||||||
|
initDataPathErr: errors.New("fake-init-data-path-error"),
|
||||||
|
expectedMessage: "Failed to init data path service for PVR fake-name: fake-init-data-path-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "run data path failed",
|
||||||
|
pvrName: "fake-name",
|
||||||
|
runCancelableDataPathErr: errors.New("fake-run-data-path-error"),
|
||||||
|
expectedMessage: "Failed to run data path service for PVR fake-name: fake-run-data-path-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "succeed",
|
||||||
|
pvrName: "fake-name",
|
||||||
|
runCancelableDataPathResult: "fake-run-data-path-result",
|
||||||
|
expectedMessage: "fake-run-data-path-result",
|
||||||
|
expectedSucceed: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
frHelper = &fakeRunHelper{
|
||||||
|
initErr: test.initDataPathErr,
|
||||||
|
runCancelableDataPathErr: test.runCancelableDataPathErr,
|
||||||
|
runCancelableDataPathResult: test.runCancelableDataPathResult,
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.createDataPathFail {
|
||||||
|
funcCreateDataPathRestore = fakeCreateRestoreDataPathServiceWithErr
|
||||||
|
} else {
|
||||||
|
funcCreateDataPathRestore = fakeCreateRestoreDataPathService
|
||||||
|
}
|
||||||
|
|
||||||
|
funcExitWithMessage = frHelper.ExitWithMessage
|
||||||
|
|
||||||
|
s := &podVolumeRestore{
|
||||||
|
logger: velerotest.NewLogger(),
|
||||||
|
cancelFunc: func() {},
|
||||||
|
config: podVolumeRestoreConfig{
|
||||||
|
pvrName: test.pvrName,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
s.runDataPath()
|
||||||
|
|
||||||
|
assert.Equal(t, test.expectedMessage, frHelper.exitMessage)
|
||||||
|
assert.Equal(t, test.expectedSucceed, frHelper.succeed)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCreateRestoreDataPathService(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fileStoreErr error
|
||||||
|
secretStoreErr error
|
||||||
|
mockGetInformer bool
|
||||||
|
getInformerErr error
|
||||||
|
expectedError string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "create credential file store error",
|
||||||
|
fileStoreErr: errors.New("fake-file-store-error"),
|
||||||
|
expectedError: "error to create credential file store: fake-file-store-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "create credential secret store",
|
||||||
|
secretStoreErr: errors.New("fake-secret-store-error"),
|
||||||
|
expectedError: "error to create credential secret store: fake-secret-store-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "get informer error",
|
||||||
|
mockGetInformer: true,
|
||||||
|
getInformerErr: errors.New("fake-get-informer-error"),
|
||||||
|
expectedError: "error to get controller-runtime informer from manager: fake-get-informer-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "succeed",
|
||||||
|
mockGetInformer: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
fcHelper := &fakeCreateDataPathServiceHelper{
|
||||||
|
fileStoreErr: test.fileStoreErr,
|
||||||
|
secretStoreErr: test.secretStoreErr,
|
||||||
|
}
|
||||||
|
|
||||||
|
funcNewCredentialFileStore = fcHelper.NewNamespacedFileStore
|
||||||
|
funcNewCredentialSecretStore = fcHelper.NewNamespacedSecretStore
|
||||||
|
|
||||||
|
cache := cacheMock.NewCache(t)
|
||||||
|
if test.mockGetInformer {
|
||||||
|
cache.On("GetInformer", mock.Anything, mock.Anything).Return(nil, test.getInformerErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
funcExitWithMessage = frHelper.ExitWithMessage
|
||||||
|
|
||||||
|
s := &podVolumeRestore{
|
||||||
|
cache: cache,
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := s.createDataPathService()
|
||||||
|
|
||||||
|
if test.expectedError != "" {
|
||||||
|
assert.EqualError(t, err, test.expectedError)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,345 @@
|
||||||
|
/*
|
||||||
|
Copyright The Velero Contributors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package podvolume
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/kubernetes"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
|
||||||
|
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||||
|
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||||
|
|
||||||
|
cachetool "k8s.io/client-go/tools/cache"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RestoreMicroService process data mover restores inside the restore pod
|
||||||
|
type RestoreMicroService struct {
|
||||||
|
ctx context.Context
|
||||||
|
client client.Client
|
||||||
|
kubeClient kubernetes.Interface
|
||||||
|
repoEnsurer *repository.Ensurer
|
||||||
|
credentialGetter *credentials.CredentialGetter
|
||||||
|
logger logrus.FieldLogger
|
||||||
|
dataPathMgr *datapath.Manager
|
||||||
|
eventRecorder kube.EventRecorder
|
||||||
|
|
||||||
|
namespace string
|
||||||
|
pvrName string
|
||||||
|
pvr *velerov1api.PodVolumeRestore
|
||||||
|
sourceTargetPath datapath.AccessPoint
|
||||||
|
|
||||||
|
resultSignal chan dataPathResult
|
||||||
|
|
||||||
|
pvrInformer cache.Informer
|
||||||
|
pvrHandler cachetool.ResourceEventHandlerRegistration
|
||||||
|
nodeName string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRestoreMicroService(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, pvrName string, namespace string, nodeName string,
|
||||||
|
sourceTargetPath datapath.AccessPoint, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, cred *credentials.CredentialGetter,
|
||||||
|
pvrInformer cache.Informer, log logrus.FieldLogger) *RestoreMicroService {
|
||||||
|
return &RestoreMicroService{
|
||||||
|
ctx: ctx,
|
||||||
|
client: client,
|
||||||
|
kubeClient: kubeClient,
|
||||||
|
credentialGetter: cred,
|
||||||
|
logger: log,
|
||||||
|
repoEnsurer: repoEnsurer,
|
||||||
|
dataPathMgr: dataPathMgr,
|
||||||
|
namespace: namespace,
|
||||||
|
pvrName: pvrName,
|
||||||
|
sourceTargetPath: sourceTargetPath,
|
||||||
|
nodeName: nodeName,
|
||||||
|
resultSignal: make(chan dataPathResult),
|
||||||
|
pvrInformer: pvrInformer,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RestoreMicroService) Init() error {
|
||||||
|
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.pvrName, r.nodeName, r.logger)
|
||||||
|
|
||||||
|
handler, err := r.pvrInformer.AddEventHandler(
|
||||||
|
cachetool.ResourceEventHandlerFuncs{
|
||||||
|
UpdateFunc: func(oldObj any, newObj any) {
|
||||||
|
oldPvr := oldObj.(*velerov1api.PodVolumeRestore)
|
||||||
|
newPvr := newObj.(*velerov1api.PodVolumeRestore)
|
||||||
|
|
||||||
|
if newPvr.Name != r.pvrName {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if newPvr.Status.Phase != velerov1api.PodVolumeRestorePhaseInProgress {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if newPvr.Spec.Cancel && !oldPvr.Spec.Cancel {
|
||||||
|
r.cancelPodVolumeRestore(newPvr)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "error adding PVR handler")
|
||||||
|
}
|
||||||
|
|
||||||
|
r.pvrHandler = handler
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string, error) {
|
||||||
|
log := r.logger.WithFields(logrus.Fields{
|
||||||
|
"PVR": r.pvrName,
|
||||||
|
})
|
||||||
|
|
||||||
|
pvr := &velerov1api.PodVolumeRestore{}
|
||||||
|
err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) {
|
||||||
|
err := r.client.Get(ctx, types.NamespacedName{
|
||||||
|
Namespace: r.namespace,
|
||||||
|
Name: r.pvrName,
|
||||||
|
}, pvr)
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return true, errors.Wrapf(err, "error to get PVR %s", r.pvrName)
|
||||||
|
}
|
||||||
|
|
||||||
|
if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress {
|
||||||
|
return true, nil
|
||||||
|
} else {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("Failed to wait PVR")
|
||||||
|
return "", errors.Wrap(err, "error waiting for PVR")
|
||||||
|
}
|
||||||
|
|
||||||
|
r.pvr = pvr
|
||||||
|
|
||||||
|
log.Info("Run cancelable PVR")
|
||||||
|
|
||||||
|
callbacks := datapath.Callbacks{
|
||||||
|
OnCompleted: r.OnPvrCompleted,
|
||||||
|
OnFailed: r.OnPvrFailed,
|
||||||
|
OnCancelled: r.OnPvrCancelled,
|
||||||
|
OnProgress: r.OnPvrProgress,
|
||||||
|
}
|
||||||
|
|
||||||
|
fsRestore, err := r.dataPathMgr.CreateFileSystemBR(pvr.Name, podVolumeRequestor, ctx, r.client, pvr.Namespace, callbacks, log)
|
||||||
|
if err != nil {
|
||||||
|
return "", errors.Wrap(err, "error to create data path")
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug("Async fs br created")
|
||||||
|
|
||||||
|
if err := fsRestore.Init(ctx,
|
||||||
|
&datapath.FSBRInitParam{
|
||||||
|
BSLName: pvr.Spec.BackupStorageLocation,
|
||||||
|
SourceNamespace: pvr.Spec.SourceNamespace,
|
||||||
|
UploaderType: pvr.Spec.UploaderType,
|
||||||
|
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
|
||||||
|
RepoIdentifier: "",
|
||||||
|
RepositoryEnsurer: r.repoEnsurer,
|
||||||
|
CredentialGetter: r.credentialGetter,
|
||||||
|
}); err != nil {
|
||||||
|
return "", errors.Wrap(err, "error to initialize data path")
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Async fs br init")
|
||||||
|
|
||||||
|
if err := fsRestore.StartRestore(pvr.Spec.SnapshotID, r.sourceTargetPath, pvr.Spec.UploaderSettings); err != nil {
|
||||||
|
return "", errors.Wrap(err, "error starting data path restore")
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Async fs restore data path started")
|
||||||
|
r.eventRecorder.Event(pvr, false, datapath.EventReasonStarted, "Data path for %s started", pvr.Name)
|
||||||
|
|
||||||
|
result := ""
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
err = errors.New("timed out waiting for fs restore to complete")
|
||||||
|
break
|
||||||
|
case res := <-r.resultSignal:
|
||||||
|
err = res.err
|
||||||
|
result = res.result
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("Async fs restore was not completed")
|
||||||
|
}
|
||||||
|
|
||||||
|
r.eventRecorder.EndingEvent(pvr, false, datapath.EventReasonStopped, "Data path for %s stopped", pvr.Name)
|
||||||
|
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RestoreMicroService) Shutdown() {
|
||||||
|
r.eventRecorder.Shutdown()
|
||||||
|
r.closeDataPath(r.ctx, r.pvrName)
|
||||||
|
|
||||||
|
if r.pvrHandler != nil {
|
||||||
|
if err := r.pvrInformer.RemoveEventHandler(r.pvrHandler); err != nil {
|
||||||
|
r.logger.WithError(err).Warn("Failed to remove pod handler")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var funcWriteCompletionMark = writeCompletionMark
|
||||||
|
|
||||||
|
func (r *RestoreMicroService) OnPvrCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) {
|
||||||
|
log := r.logger.WithField("PVR", pvrName)
|
||||||
|
|
||||||
|
err := funcWriteCompletionMark(r.pvr, result.Restore, log)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Warnf("Failed to write completion mark, restored pod may failed to start")
|
||||||
|
}
|
||||||
|
|
||||||
|
restoreBytes, err := funcMarshal(result.Restore)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Errorf("Failed to marshal restore result %v", result.Restore)
|
||||||
|
r.recordPvrFailed(fmt.Sprintf("error marshaling restore result %v", result.Restore), err)
|
||||||
|
} else {
|
||||||
|
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonCompleted, string(restoreBytes))
|
||||||
|
r.resultSignal <- dataPathResult{
|
||||||
|
result: string(restoreBytes),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Async fs restore data path completed")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RestoreMicroService) recordPvrFailed(msg string, err error) {
|
||||||
|
evtMsg := fmt.Sprintf("%s, error %v", msg, err)
|
||||||
|
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonFailed, evtMsg)
|
||||||
|
r.resultSignal <- dataPathResult{
|
||||||
|
err: errors.Wrapf(err, msg),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RestoreMicroService) OnPvrFailed(ctx context.Context, namespace string, pvrName string, err error) {
|
||||||
|
log := r.logger.WithField("PVR", pvrName)
|
||||||
|
log.WithError(err).Error("Async fs restore data path failed")
|
||||||
|
|
||||||
|
r.recordPvrFailed(fmt.Sprintf("Data path for PVR %s failed", pvrName), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RestoreMicroService) OnPvrCancelled(ctx context.Context, namespace string, pvrName string) {
|
||||||
|
log := r.logger.WithField("PVR", pvrName)
|
||||||
|
log.Warn("Async fs restore data path canceled")
|
||||||
|
|
||||||
|
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonCancelled, "Data path for PVR %s canceled", pvrName)
|
||||||
|
r.resultSignal <- dataPathResult{
|
||||||
|
err: errors.New(datapath.ErrCancelled),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RestoreMicroService) OnPvrProgress(ctx context.Context, namespace string, pvrName string, progress *uploader.Progress) {
|
||||||
|
log := r.logger.WithFields(logrus.Fields{
|
||||||
|
"PVR": pvrName,
|
||||||
|
})
|
||||||
|
|
||||||
|
progressBytes, err := funcMarshal(progress)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Errorf("Failed to marshal progress %v", progress)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonProgress, string(progressBytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RestoreMicroService) closeDataPath(ctx context.Context, pvrName string) {
|
||||||
|
fsRestore := r.dataPathMgr.GetAsyncBR(pvrName)
|
||||||
|
if fsRestore != nil {
|
||||||
|
fsRestore.Close(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.dataPathMgr.RemoveAsyncBR(pvrName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RestoreMicroService) cancelPodVolumeRestore(pvr *velerov1api.PodVolumeRestore) {
|
||||||
|
r.logger.WithField("PVR", pvr.Name).Info("PVR is being canceled")
|
||||||
|
|
||||||
|
r.eventRecorder.Event(pvr, false, datapath.EventReasonCancelling, "Canceling for PVR %s", pvr.Name)
|
||||||
|
|
||||||
|
fsBackup := r.dataPathMgr.GetAsyncBR(pvr.Name)
|
||||||
|
if fsBackup == nil {
|
||||||
|
r.OnPvrCancelled(r.ctx, pvr.GetNamespace(), pvr.GetName())
|
||||||
|
} else {
|
||||||
|
fsBackup.Cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var funcRemoveAll = os.RemoveAll
|
||||||
|
var funcMkdirAll = os.MkdirAll
|
||||||
|
var funcWriteFile = os.WriteFile
|
||||||
|
|
||||||
|
func writeCompletionMark(pvr *velerov1api.PodVolumeRestore, result datapath.RestoreResult, log logrus.FieldLogger) error {
|
||||||
|
volumePath := result.Target.ByPath
|
||||||
|
if volumePath == "" {
|
||||||
|
return errors.New("target volume is empty in restore result")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the .velero directory from the restored volume (it may contain done files from previous restores
|
||||||
|
// of this volume, which we don't want to carry over). If this fails for any reason, log and continue, since
|
||||||
|
// this is non-essential cleanup (the done files are named based on restore UID and the init container looks
|
||||||
|
// for the one specific to the restore being executed).
|
||||||
|
if err := funcRemoveAll(filepath.Join(volumePath, ".velero")); err != nil {
|
||||||
|
log.WithError(err).Warnf("Failed to remove .velero directory from directory %s", volumePath)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pvr.OwnerReferences) == 0 {
|
||||||
|
return errors.New("error finding restore UID")
|
||||||
|
}
|
||||||
|
|
||||||
|
restoreUID := pvr.OwnerReferences[0].UID
|
||||||
|
|
||||||
|
// Create the .velero directory within the volume dir so we can write a done file
|
||||||
|
// for this restore.
|
||||||
|
if err := funcMkdirAll(filepath.Join(volumePath, ".velero"), 0755); err != nil {
|
||||||
|
return errors.Wrapf(err, "error creating .velero directory for done file")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write a done file with name=<restore-uid> into the just-created .velero dir
|
||||||
|
// within the volume. The velero init container on the pod is waiting
|
||||||
|
// for this file to exist in each restored volume before completing.
|
||||||
|
if err := funcWriteFile(filepath.Join(volumePath, ".velero", string(restoreUID)), nil, 0644); err != nil {
|
||||||
|
return errors.Wrapf(err, "error writing done file")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,622 @@
|
||||||
|
/*
|
||||||
|
Copyright The Velero Contributors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package podvolume
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||||
|
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||||
|
|
||||||
|
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||||
|
|
||||||
|
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||||
|
|
||||||
|
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||||
|
|
||||||
|
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
|
||||||
|
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
|
||||||
|
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
type restoreMsTestHelper struct {
|
||||||
|
eventReason string
|
||||||
|
eventMsg string
|
||||||
|
marshalErr error
|
||||||
|
marshalBytes []byte
|
||||||
|
withEvent bool
|
||||||
|
eventLock sync.Mutex
|
||||||
|
writeCompletionErr error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rt *restoreMsTestHelper) Event(_ runtime.Object, _ bool, reason string, message string, a ...any) {
|
||||||
|
rt.eventLock.Lock()
|
||||||
|
defer rt.eventLock.Unlock()
|
||||||
|
|
||||||
|
rt.withEvent = true
|
||||||
|
rt.eventReason = reason
|
||||||
|
rt.eventMsg = fmt.Sprintf(message, a...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rt *restoreMsTestHelper) EndingEvent(_ runtime.Object, _ bool, reason string, message string, a ...any) {
|
||||||
|
rt.eventLock.Lock()
|
||||||
|
defer rt.eventLock.Unlock()
|
||||||
|
|
||||||
|
rt.withEvent = true
|
||||||
|
rt.eventReason = reason
|
||||||
|
rt.eventMsg = fmt.Sprintf(message, a...)
|
||||||
|
}
|
||||||
|
func (rt *restoreMsTestHelper) Shutdown() {}
|
||||||
|
|
||||||
|
func (rt *restoreMsTestHelper) Marshal(v any) ([]byte, error) {
|
||||||
|
if rt.marshalErr != nil {
|
||||||
|
return nil, rt.marshalErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return rt.marshalBytes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rt *restoreMsTestHelper) EventReason() string {
|
||||||
|
rt.eventLock.Lock()
|
||||||
|
defer rt.eventLock.Unlock()
|
||||||
|
|
||||||
|
return rt.eventReason
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rt *restoreMsTestHelper) EventMessage() string {
|
||||||
|
rt.eventLock.Lock()
|
||||||
|
defer rt.eventLock.Unlock()
|
||||||
|
|
||||||
|
return rt.eventMsg
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rt *restoreMsTestHelper) WriteCompletionMark(*velerov1api.PodVolumeRestore, datapath.RestoreResult, logrus.FieldLogger) error {
|
||||||
|
return rt.writeCompletionErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOnPvrFailed(t *testing.T) {
|
||||||
|
pvrName := "fake-pvr"
|
||||||
|
rt := &restoreMsTestHelper{}
|
||||||
|
|
||||||
|
rs := &RestoreMicroService{
|
||||||
|
pvrName: pvrName,
|
||||||
|
dataPathMgr: datapath.NewManager(1),
|
||||||
|
eventRecorder: rt,
|
||||||
|
resultSignal: make(chan dataPathResult),
|
||||||
|
logger: velerotest.NewLogger(),
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedErr := "Data path for PVR fake-pvr failed: fake-error"
|
||||||
|
expectedEventReason := datapath.EventReasonFailed
|
||||||
|
expectedEventMsg := "Data path for PVR fake-pvr failed, error fake-error"
|
||||||
|
|
||||||
|
go rs.OnPvrFailed(context.TODO(), velerov1api.DefaultNamespace, pvrName, errors.New("fake-error"))
|
||||||
|
|
||||||
|
result := <-rs.resultSignal
|
||||||
|
assert.EqualError(t, result.err, expectedErr)
|
||||||
|
assert.Equal(t, expectedEventReason, rt.EventReason())
|
||||||
|
assert.Equal(t, expectedEventMsg, rt.EventMessage())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPvrCancelled(t *testing.T) {
|
||||||
|
pvrName := "fake-pvr"
|
||||||
|
rt := &restoreMsTestHelper{}
|
||||||
|
|
||||||
|
rs := RestoreMicroService{
|
||||||
|
pvrName: pvrName,
|
||||||
|
dataPathMgr: datapath.NewManager(1),
|
||||||
|
eventRecorder: rt,
|
||||||
|
resultSignal: make(chan dataPathResult),
|
||||||
|
logger: velerotest.NewLogger(),
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedErr := datapath.ErrCancelled
|
||||||
|
expectedEventReason := datapath.EventReasonCancelled
|
||||||
|
expectedEventMsg := "Data path for PVR fake-pvr canceled"
|
||||||
|
|
||||||
|
go rs.OnPvrCancelled(context.TODO(), velerov1api.DefaultNamespace, pvrName)
|
||||||
|
|
||||||
|
result := <-rs.resultSignal
|
||||||
|
assert.EqualError(t, result.err, expectedErr)
|
||||||
|
assert.Equal(t, expectedEventReason, rt.EventReason())
|
||||||
|
assert.Equal(t, expectedEventMsg, rt.EventMessage())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOnPvrCompleted(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
expectedErr string
|
||||||
|
expectedEventReason string
|
||||||
|
expectedEventMsg string
|
||||||
|
marshalErr error
|
||||||
|
marshallStr string
|
||||||
|
writeCompletionErr error
|
||||||
|
expectedLog string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "marshal fail",
|
||||||
|
marshalErr: errors.New("fake-marshal-error"),
|
||||||
|
expectedErr: "error marshaling restore result {{ } 0}: fake-marshal-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "succeed",
|
||||||
|
marshallStr: "fake-complete-string",
|
||||||
|
expectedEventReason: datapath.EventReasonCompleted,
|
||||||
|
expectedEventMsg: "fake-complete-string",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "succeed but write completion mark fail",
|
||||||
|
marshallStr: "fake-complete-string",
|
||||||
|
writeCompletionErr: errors.New("fake-write-completion-error"),
|
||||||
|
expectedEventReason: datapath.EventReasonCompleted,
|
||||||
|
expectedEventMsg: "fake-complete-string",
|
||||||
|
expectedLog: "Failed to write completion mark, restored pod may failed to start",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
pvrName := "fake-pvr"
|
||||||
|
|
||||||
|
rt := &restoreMsTestHelper{
|
||||||
|
marshalErr: test.marshalErr,
|
||||||
|
marshalBytes: []byte(test.marshallStr),
|
||||||
|
writeCompletionErr: test.writeCompletionErr,
|
||||||
|
}
|
||||||
|
|
||||||
|
logBuffer := []string{}
|
||||||
|
|
||||||
|
rs := &RestoreMicroService{
|
||||||
|
dataPathMgr: datapath.NewManager(1),
|
||||||
|
eventRecorder: rt,
|
||||||
|
resultSignal: make(chan dataPathResult),
|
||||||
|
logger: velerotest.NewMultipleLogger(&logBuffer),
|
||||||
|
}
|
||||||
|
|
||||||
|
funcMarshal = rt.Marshal
|
||||||
|
funcWriteCompletionMark = rt.WriteCompletionMark
|
||||||
|
|
||||||
|
go rs.OnPvrCompleted(context.TODO(), velerov1api.DefaultNamespace, pvrName, datapath.Result{})
|
||||||
|
|
||||||
|
result := <-rs.resultSignal
|
||||||
|
if test.marshalErr != nil {
|
||||||
|
assert.EqualError(t, result.err, test.expectedErr)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, result.err)
|
||||||
|
assert.Equal(t, test.expectedEventReason, rt.EventReason())
|
||||||
|
assert.Equal(t, test.expectedEventMsg, rt.EventMessage())
|
||||||
|
|
||||||
|
if test.expectedLog != "" {
|
||||||
|
assert.Contains(t, logBuffer[0], test.expectedLog)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOnPvrProgress(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
expectedErr string
|
||||||
|
expectedEventReason string
|
||||||
|
expectedEventMsg string
|
||||||
|
marshalErr error
|
||||||
|
marshallStr string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "marshal fail",
|
||||||
|
marshalErr: errors.New("fake-marshal-error"),
|
||||||
|
expectedErr: "Failed to marshal restore result",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "succeed",
|
||||||
|
marshallStr: "fake-progress-string",
|
||||||
|
expectedEventReason: datapath.EventReasonProgress,
|
||||||
|
expectedEventMsg: "fake-progress-string",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
pvrName := "fake-pvr"
|
||||||
|
|
||||||
|
rt := &restoreMsTestHelper{
|
||||||
|
marshalErr: test.marshalErr,
|
||||||
|
marshalBytes: []byte(test.marshallStr),
|
||||||
|
}
|
||||||
|
|
||||||
|
rs := &RestoreMicroService{
|
||||||
|
dataPathMgr: datapath.NewManager(1),
|
||||||
|
eventRecorder: rt,
|
||||||
|
logger: velerotest.NewLogger(),
|
||||||
|
}
|
||||||
|
|
||||||
|
funcMarshal = rt.Marshal
|
||||||
|
|
||||||
|
rs.OnPvrProgress(context.TODO(), velerov1api.DefaultNamespace, pvrName, &uploader.Progress{})
|
||||||
|
|
||||||
|
if test.marshalErr != nil {
|
||||||
|
assert.False(t, rt.withEvent)
|
||||||
|
} else {
|
||||||
|
assert.True(t, rt.withEvent)
|
||||||
|
assert.Equal(t, test.expectedEventReason, rt.EventReason())
|
||||||
|
assert.Equal(t, test.expectedEventMsg, rt.EventMessage())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCancelPodVolumeRestore(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
expectedEventReason string
|
||||||
|
expectedEventMsg string
|
||||||
|
expectedErr string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no fs restore",
|
||||||
|
expectedEventReason: datapath.EventReasonCancelled,
|
||||||
|
expectedEventMsg: "Data path for PVR fake-pvr canceled",
|
||||||
|
expectedErr: datapath.ErrCancelled,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
pvrName := "fake-pvr"
|
||||||
|
pvr := builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Result()
|
||||||
|
|
||||||
|
rt := &restoreMsTestHelper{}
|
||||||
|
|
||||||
|
rs := &RestoreMicroService{
|
||||||
|
dataPathMgr: datapath.NewManager(1),
|
||||||
|
eventRecorder: rt,
|
||||||
|
resultSignal: make(chan dataPathResult),
|
||||||
|
logger: velerotest.NewLogger(),
|
||||||
|
}
|
||||||
|
|
||||||
|
go rs.cancelPodVolumeRestore(pvr)
|
||||||
|
|
||||||
|
result := <-rs.resultSignal
|
||||||
|
|
||||||
|
assert.EqualError(t, result.err, test.expectedErr)
|
||||||
|
assert.True(t, rt.withEvent)
|
||||||
|
assert.Equal(t, test.expectedEventReason, rt.EventReason())
|
||||||
|
assert.Equal(t, test.expectedEventMsg, rt.EventMessage())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunCancelableDataPathRestore(t *testing.T) {
|
||||||
|
pvrName := "fake-pvr"
|
||||||
|
pvr := builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseNew).Result()
|
||||||
|
pvrInProgress := builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result()
|
||||||
|
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
ctx context.Context
|
||||||
|
result *dataPathResult
|
||||||
|
dataPathMgr *datapath.Manager
|
||||||
|
kubeClientObj []runtime.Object
|
||||||
|
initErr error
|
||||||
|
startErr error
|
||||||
|
dataPathStarted bool
|
||||||
|
expectedEventMsg string
|
||||||
|
expectedErr string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no pvr",
|
||||||
|
ctx: ctxTimeout,
|
||||||
|
expectedErr: "error waiting for PVR: context deadline exceeded",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pvr not in in-progress",
|
||||||
|
ctx: ctxTimeout,
|
||||||
|
kubeClientObj: []runtime.Object{pvr},
|
||||||
|
expectedErr: "error waiting for PVR: context deadline exceeded",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "create data path fail",
|
||||||
|
ctx: context.Background(),
|
||||||
|
kubeClientObj: []runtime.Object{pvrInProgress},
|
||||||
|
dataPathMgr: datapath.NewManager(0),
|
||||||
|
expectedErr: "error to create data path: Concurrent number exceeds",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "init data path fail",
|
||||||
|
ctx: context.Background(),
|
||||||
|
kubeClientObj: []runtime.Object{pvrInProgress},
|
||||||
|
initErr: errors.New("fake-init-error"),
|
||||||
|
expectedErr: "error to initialize data path: fake-init-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "start data path fail",
|
||||||
|
ctx: context.Background(),
|
||||||
|
kubeClientObj: []runtime.Object{pvrInProgress},
|
||||||
|
startErr: errors.New("fake-start-error"),
|
||||||
|
expectedErr: "error starting data path restore: fake-start-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "data path timeout",
|
||||||
|
ctx: ctxTimeout,
|
||||||
|
kubeClientObj: []runtime.Object{pvrInProgress},
|
||||||
|
dataPathStarted: true,
|
||||||
|
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvrName),
|
||||||
|
expectedErr: "timed out waiting for fs restore to complete",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "data path returns error",
|
||||||
|
ctx: context.Background(),
|
||||||
|
kubeClientObj: []runtime.Object{pvrInProgress},
|
||||||
|
dataPathStarted: true,
|
||||||
|
result: &dataPathResult{
|
||||||
|
err: errors.New("fake-data-path-error"),
|
||||||
|
},
|
||||||
|
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvrName),
|
||||||
|
expectedErr: "fake-data-path-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "succeed",
|
||||||
|
ctx: context.Background(),
|
||||||
|
kubeClientObj: []runtime.Object{pvrInProgress},
|
||||||
|
dataPathStarted: true,
|
||||||
|
result: &dataPathResult{
|
||||||
|
result: "fake-succeed-result",
|
||||||
|
},
|
||||||
|
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvrName),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
scheme := runtime.NewScheme()
|
||||||
|
velerov1api.AddToScheme(scheme)
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
fakeClientBuilder := clientFake.NewClientBuilder()
|
||||||
|
fakeClientBuilder = fakeClientBuilder.WithScheme(scheme)
|
||||||
|
|
||||||
|
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
|
||||||
|
|
||||||
|
rt := &restoreMsTestHelper{}
|
||||||
|
|
||||||
|
rs := &RestoreMicroService{
|
||||||
|
namespace: velerov1api.DefaultNamespace,
|
||||||
|
pvrName: pvrName,
|
||||||
|
ctx: context.Background(),
|
||||||
|
client: fakeClient,
|
||||||
|
dataPathMgr: datapath.NewManager(1),
|
||||||
|
eventRecorder: rt,
|
||||||
|
resultSignal: make(chan dataPathResult),
|
||||||
|
logger: velerotest.NewLogger(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.ctx != nil {
|
||||||
|
rs.ctx = test.ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.dataPathMgr != nil {
|
||||||
|
rs.dataPathMgr = test.dataPathMgr
|
||||||
|
}
|
||||||
|
|
||||||
|
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||||
|
fsBR := datapathmockes.NewAsyncBR(t)
|
||||||
|
if test.initErr != nil {
|
||||||
|
fsBR.On("Init", mock.Anything, mock.Anything).Return(test.initErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.startErr != nil {
|
||||||
|
fsBR.On("Init", mock.Anything, mock.Anything).Return(nil)
|
||||||
|
fsBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.startErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.dataPathStarted {
|
||||||
|
fsBR.On("Init", mock.Anything, mock.Anything).Return(nil)
|
||||||
|
fsBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fsBR
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.result != nil {
|
||||||
|
go func() {
|
||||||
|
time.Sleep(time.Millisecond * 500)
|
||||||
|
rs.resultSignal <- *test.result
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := rs.RunCancelableDataPath(test.ctx)
|
||||||
|
|
||||||
|
if test.expectedErr != "" {
|
||||||
|
assert.EqualError(t, err, test.expectedErr)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, test.result.result, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.expectedEventMsg != "" {
|
||||||
|
assert.True(t, rt.withEvent)
|
||||||
|
assert.Equal(t, test.expectedEventMsg, rt.EventMessage())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWriteCompletionMark(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
pvr *velerov1api.PodVolumeRestore
|
||||||
|
result datapath.RestoreResult
|
||||||
|
funcRemoveAll func(string) error
|
||||||
|
funcMkdirAll func(string, os.FileMode) error
|
||||||
|
funcWriteFile func(string, []byte, os.FileMode) error
|
||||||
|
expectedErr string
|
||||||
|
expectedLog string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no volume path",
|
||||||
|
result: datapath.RestoreResult{},
|
||||||
|
expectedErr: "target volume is empty in restore result",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no owner reference",
|
||||||
|
result: datapath.RestoreResult{
|
||||||
|
Target: datapath.AccessPoint{
|
||||||
|
ByPath: "fake-volume-path",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, "fake-pvr").Result(),
|
||||||
|
funcRemoveAll: func(string) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
expectedErr: "error finding restore UID",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "mkdir fail",
|
||||||
|
result: datapath.RestoreResult{
|
||||||
|
Target: datapath.AccessPoint{
|
||||||
|
ByPath: "fake-volume-path",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, "fake-pvr").OwnerReference([]metav1.OwnerReference{
|
||||||
|
{
|
||||||
|
UID: "fake-uid",
|
||||||
|
},
|
||||||
|
}).Result(),
|
||||||
|
funcRemoveAll: func(string) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
funcMkdirAll: func(string, os.FileMode) error {
|
||||||
|
return errors.New("fake-mk-dir-error")
|
||||||
|
},
|
||||||
|
expectedErr: "error creating .velero directory for done file: fake-mk-dir-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "write file fail",
|
||||||
|
result: datapath.RestoreResult{
|
||||||
|
Target: datapath.AccessPoint{
|
||||||
|
ByPath: "fake-volume-path",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, "fake-pvr").OwnerReference([]metav1.OwnerReference{
|
||||||
|
{
|
||||||
|
UID: "fake-uid",
|
||||||
|
},
|
||||||
|
}).Result(),
|
||||||
|
funcRemoveAll: func(string) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
funcMkdirAll: func(string, os.FileMode) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
funcWriteFile: func(string, []byte, os.FileMode) error {
|
||||||
|
return errors.New("fake-write-file-error")
|
||||||
|
},
|
||||||
|
expectedErr: "error writing done file: fake-write-file-error",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "succeed",
|
||||||
|
result: datapath.RestoreResult{
|
||||||
|
Target: datapath.AccessPoint{
|
||||||
|
ByPath: "fake-volume-path",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, "fake-pvr").OwnerReference([]metav1.OwnerReference{
|
||||||
|
{
|
||||||
|
UID: "fake-uid",
|
||||||
|
},
|
||||||
|
}).Result(),
|
||||||
|
funcRemoveAll: func(string) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
funcMkdirAll: func(string, os.FileMode) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
funcWriteFile: func(string, []byte, os.FileMode) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "succeed but previous dir is not removed",
|
||||||
|
result: datapath.RestoreResult{
|
||||||
|
Target: datapath.AccessPoint{
|
||||||
|
ByPath: "fake-volume-path",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, "fake-pvr").OwnerReference([]metav1.OwnerReference{
|
||||||
|
{
|
||||||
|
UID: "fake-uid",
|
||||||
|
},
|
||||||
|
}).Result(),
|
||||||
|
funcRemoveAll: func(string) error {
|
||||||
|
return errors.New("fake-remove-dir-error")
|
||||||
|
},
|
||||||
|
funcMkdirAll: func(string, os.FileMode) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
funcWriteFile: func(string, []byte, os.FileMode) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
expectedLog: "Failed to remove .velero directory from directory fake-volume-path",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
if test.funcRemoveAll != nil {
|
||||||
|
funcRemoveAll = test.funcRemoveAll
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.funcMkdirAll != nil {
|
||||||
|
funcMkdirAll = test.funcMkdirAll
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.funcWriteFile != nil {
|
||||||
|
funcWriteFile = test.funcWriteFile
|
||||||
|
}
|
||||||
|
|
||||||
|
logBuffer := ""
|
||||||
|
err := writeCompletionMark(test.pvr, test.result, velerotest.NewSingleLogger(&logBuffer))
|
||||||
|
|
||||||
|
if test.expectedErr == "" {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
} else {
|
||||||
|
assert.EqualError(t, err, test.expectedErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.expectedLog != "" {
|
||||||
|
assert.Contains(t, logBuffer, test.expectedLog)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue