Merge branch 'main' into pod-volume-exposer
commit
5d5d4cd657
|
@ -0,0 +1 @@
|
|||
Add BSL status check for backup/restore operations.
|
|
@ -0,0 +1 @@
|
|||
Fix issue #8988, add data path for VGDP ms pvb
|
|
@ -76,6 +76,11 @@ spec:
|
|||
BackupStorageLocation is the name of the backup storage location
|
||||
where the backup repository is stored.
|
||||
type: string
|
||||
cancel:
|
||||
description: |-
|
||||
Cancel indicates request to cancel the ongoing PodVolumeBackup. It can be set
|
||||
when the PodVolumeBackup is in InProgress phase
|
||||
type: boolean
|
||||
node:
|
||||
description: Node is the name of the node that the Pod is running
|
||||
on.
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,7 @@
|
|||
package credentials
|
||||
|
||||
import "os"
|
||||
|
||||
func DefaultStoreDirectory() string {
|
||||
return os.TempDir() + "/credentials"
|
||||
}
|
|
@ -57,6 +57,10 @@ type PodVolumeBackupSpec struct {
|
|||
// +optional
|
||||
// +nullable
|
||||
UploaderSettings map[string]string `json:"uploaderSettings,omitempty"`
|
||||
|
||||
// Cancel indicates request to cancel the ongoing PodVolumeBackup. It can be set
|
||||
// when the PodVolumeBackup is in InProgress phase
|
||||
Cancel bool `json:"cancel,omitempty"`
|
||||
}
|
||||
|
||||
// PodVolumeBackupPhase represents the lifecycle phase of a PodVolumeBackup.
|
||||
|
|
|
@ -113,3 +113,9 @@ func (b *PodVolumeBackupBuilder) UploaderType(uploaderType string) *PodVolumeBac
|
|||
b.object.Spec.UploaderType = uploaderType
|
||||
return b
|
||||
}
|
||||
|
||||
// Annotations sets the PodVolumeBackup's Annotations.
|
||||
func (b *PodVolumeBackupBuilder) Annotations(annotations map[string]string) *PodVolumeBackupBuilder {
|
||||
b.object.Annotations = annotations
|
||||
return b
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import (
|
|||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/logging"
|
||||
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
|
@ -78,7 +79,7 @@ func NewBackupCommand(f client.Factory) *cobra.Command {
|
|||
f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
|
||||
s, err := newdataMoverBackup(logger, f, config)
|
||||
if err != nil {
|
||||
exitWithMessage(logger, false, "Failed to create data mover backup, %v", err)
|
||||
kube.ExitPodWithMessage(logger, false, "Failed to create data mover backup, %v", err)
|
||||
}
|
||||
|
||||
s.run()
|
||||
|
@ -100,12 +101,6 @@ func NewBackupCommand(f client.Factory) *cobra.Command {
|
|||
return command
|
||||
}
|
||||
|
||||
const (
|
||||
// defaultCredentialsDirectory is the path on disk where credential
|
||||
// files will be written to
|
||||
defaultCredentialsDirectory = "/tmp/credentials"
|
||||
)
|
||||
|
||||
type dataMoverBackup struct {
|
||||
logger logrus.FieldLogger
|
||||
ctx context.Context
|
||||
|
@ -215,7 +210,7 @@ func newdataMoverBackup(logger logrus.FieldLogger, factory client.Factory, confi
|
|||
return s, nil
|
||||
}
|
||||
|
||||
var funcExitWithMessage = exitWithMessage
|
||||
var funcExitWithMessage = kube.ExitPodWithMessage
|
||||
var funcCreateDataPathService = (*dataMoverBackup).createDataPathService
|
||||
|
||||
func (s *dataMoverBackup) run() {
|
||||
|
@ -277,7 +272,7 @@ func (s *dataMoverBackup) createDataPathService() (dataPathService, error) {
|
|||
credentialFileStore, err := funcNewCredentialFileStore(
|
||||
s.client,
|
||||
s.namespace,
|
||||
defaultCredentialsDirectory,
|
||||
credentials.DefaultStoreDirectory(),
|
||||
filesystem.NewFileSystem(),
|
||||
)
|
||||
if err != nil {
|
||||
|
|
|
@ -15,10 +15,7 @@ package datamover
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/client"
|
||||
|
@ -45,30 +42,3 @@ type dataPathService interface {
|
|||
RunCancelableDataPath(context.Context) (string, error)
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
var funcExit = os.Exit
|
||||
var funcCreateFile = os.Create
|
||||
|
||||
func exitWithMessage(logger logrus.FieldLogger, succeed bool, message string, a ...any) {
|
||||
exitCode := 0
|
||||
if !succeed {
|
||||
exitCode = 1
|
||||
}
|
||||
|
||||
toWrite := fmt.Sprintf(message, a...)
|
||||
|
||||
podFile, err := funcCreateFile("/dev/termination-log")
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("Failed to create termination log file")
|
||||
exitCode = 1
|
||||
} else {
|
||||
if _, err := podFile.WriteString(toWrite); err != nil {
|
||||
logger.WithError(err).Error("Failed to write error to termination log file")
|
||||
exitCode = 1
|
||||
}
|
||||
|
||||
podFile.Close()
|
||||
}
|
||||
|
||||
funcExit(exitCode)
|
||||
}
|
||||
|
|
|
@ -1,131 +0,0 @@
|
|||
/*
|
||||
Copyright The Velero Contributors.
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package datamover
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
)
|
||||
|
||||
type exitWithMessageMock struct {
|
||||
createErr error
|
||||
writeFail bool
|
||||
filePath string
|
||||
exitCode int
|
||||
}
|
||||
|
||||
func (em *exitWithMessageMock) Exit(code int) {
|
||||
em.exitCode = code
|
||||
}
|
||||
|
||||
func (em *exitWithMessageMock) CreateFile(name string) (*os.File, error) {
|
||||
if em.createErr != nil {
|
||||
return nil, em.createErr
|
||||
}
|
||||
|
||||
if em.writeFail {
|
||||
return os.OpenFile(em.filePath, os.O_CREATE|os.O_RDONLY, 0500)
|
||||
} else {
|
||||
return os.Create(em.filePath)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExitWithMessage(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
message string
|
||||
succeed bool
|
||||
args []any
|
||||
createErr error
|
||||
writeFail bool
|
||||
expectedExitCode int
|
||||
expectedMessage string
|
||||
}{
|
||||
{
|
||||
name: "create pod file failed",
|
||||
createErr: errors.New("fake-create-file-error"),
|
||||
succeed: true,
|
||||
expectedExitCode: 1,
|
||||
},
|
||||
{
|
||||
name: "write pod file failed",
|
||||
writeFail: true,
|
||||
succeed: true,
|
||||
expectedExitCode: 1,
|
||||
},
|
||||
{
|
||||
name: "not succeed",
|
||||
message: "fake-message-1, arg-1 %s, arg-2 %v, arg-3 %v",
|
||||
args: []any{
|
||||
"arg-1-1",
|
||||
10,
|
||||
false,
|
||||
},
|
||||
expectedExitCode: 1,
|
||||
expectedMessage: fmt.Sprintf("fake-message-1, arg-1 %s, arg-2 %v, arg-3 %v", "arg-1-1", 10, false),
|
||||
},
|
||||
{
|
||||
name: "not succeed",
|
||||
message: "fake-message-2, arg-1 %s, arg-2 %v, arg-3 %v",
|
||||
args: []any{
|
||||
"arg-1-2",
|
||||
20,
|
||||
true,
|
||||
},
|
||||
succeed: true,
|
||||
expectedMessage: fmt.Sprintf("fake-message-2, arg-1 %s, arg-2 %v, arg-3 %v", "arg-1-2", 20, true),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
podFile := filepath.Join(os.TempDir(), uuid.NewString())
|
||||
|
||||
em := exitWithMessageMock{
|
||||
createErr: test.createErr,
|
||||
writeFail: test.writeFail,
|
||||
filePath: podFile,
|
||||
}
|
||||
|
||||
funcExit = em.Exit
|
||||
funcCreateFile = em.CreateFile
|
||||
|
||||
exitWithMessage(velerotest.NewLogger(), test.succeed, test.message, test.args...)
|
||||
|
||||
assert.Equal(t, test.expectedExitCode, em.exitCode)
|
||||
|
||||
if test.createErr == nil && !test.writeFail {
|
||||
reader, err := os.Open(podFile)
|
||||
require.NoError(t, err)
|
||||
|
||||
message, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
reader.Close()
|
||||
|
||||
assert.Equal(t, test.expectedMessage, string(message))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -42,6 +42,7 @@ import (
|
|||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/logging"
|
||||
|
||||
ctlcache "sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
|
@ -76,7 +77,7 @@ func NewRestoreCommand(f client.Factory) *cobra.Command {
|
|||
f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
|
||||
s, err := newdataMoverRestore(logger, f, config)
|
||||
if err != nil {
|
||||
exitWithMessage(logger, false, "Failed to create data mover restore, %v", err)
|
||||
kube.ExitPodWithMessage(logger, false, "Failed to create data mover restore, %v", err)
|
||||
}
|
||||
|
||||
s.run()
|
||||
|
@ -263,7 +264,7 @@ func (s *dataMoverRestore) createDataPathService() (dataPathService, error) {
|
|||
credentialFileStore, err := funcNewCredentialFileStore(
|
||||
s.client,
|
||||
s.namespace,
|
||||
defaultCredentialsDirectory,
|
||||
credentials.DefaultStoreDirectory(),
|
||||
filesystem.NewFileSystem(),
|
||||
)
|
||||
if err != nil {
|
||||
|
|
|
@ -76,10 +76,6 @@ const (
|
|||
// the port where prometheus metrics are exposed
|
||||
defaultMetricsAddress = ":8085"
|
||||
|
||||
// defaultCredentialsDirectory is the path on disk where credential
|
||||
// files will be written to
|
||||
defaultCredentialsDirectory = "/tmp/credentials"
|
||||
|
||||
defaultResourceTimeout = 10 * time.Minute
|
||||
defaultDataMoverPrepareTimeout = 30 * time.Minute
|
||||
defaultDataPathConcurrentNum = 1
|
||||
|
@ -289,7 +285,7 @@ func (s *nodeAgentServer) run() {
|
|||
credentialFileStore, err := credentials.NewNamespacedFileStore(
|
||||
s.mgr.GetClient(),
|
||||
s.namespace,
|
||||
defaultCredentialsDirectory,
|
||||
credentials.DefaultStoreDirectory(),
|
||||
filesystem.NewFileSystem(),
|
||||
)
|
||||
if err != nil {
|
||||
|
|
|
@ -0,0 +1,291 @@
|
|||
/*
|
||||
Copyright The Velero Contributors.
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podvolume
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/bombsimon/logrusr/v3"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
"github.com/vmware-tanzu/velero/pkg/buildinfo"
|
||||
"github.com/vmware-tanzu/velero/pkg/client"
|
||||
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
|
||||
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||
"github.com/vmware-tanzu/velero/pkg/podvolume"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/logging"
|
||||
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
|
||||
ctlcache "sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
type podVolumeBackupConfig struct {
|
||||
volumePath string
|
||||
pvbName string
|
||||
resourceTimeout time.Duration
|
||||
}
|
||||
|
||||
func NewBackupCommand(f client.Factory) *cobra.Command {
|
||||
config := podVolumeBackupConfig{}
|
||||
|
||||
logLevelFlag := logging.LogLevelFlag(logrus.InfoLevel)
|
||||
formatFlag := logging.NewFormatFlag()
|
||||
|
||||
command := &cobra.Command{
|
||||
Use: "backup",
|
||||
Short: "Run the velero pod volume backup",
|
||||
Long: "Run the velero pod volume backup",
|
||||
Hidden: true,
|
||||
Run: func(c *cobra.Command, args []string) {
|
||||
logLevel := logLevelFlag.Parse()
|
||||
logrus.Infof("Setting log-level to %s", strings.ToUpper(logLevel.String()))
|
||||
|
||||
logger := logging.DefaultLogger(logLevel, formatFlag.Parse())
|
||||
logger.Infof("Starting Velero pod volume backup %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA())
|
||||
|
||||
f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
|
||||
s, err := newPodVolumeBackup(logger, f, config)
|
||||
if err != nil {
|
||||
kube.ExitPodWithMessage(logger, false, "Failed to create pod volume backup, %v", err)
|
||||
}
|
||||
|
||||
s.run()
|
||||
},
|
||||
}
|
||||
|
||||
command.Flags().Var(logLevelFlag, "log-level", fmt.Sprintf("The level at which to log. Valid values are %s.", strings.Join(logLevelFlag.AllowedValues(), ", ")))
|
||||
command.Flags().Var(formatFlag, "log-format", fmt.Sprintf("The format for log output. Valid values are %s.", strings.Join(formatFlag.AllowedValues(), ", ")))
|
||||
command.Flags().StringVar(&config.volumePath, "volume-path", config.volumePath, "The full path of the volume to be backed up")
|
||||
command.Flags().StringVar(&config.pvbName, "pod-volume-backup", config.pvbName, "The PVB name")
|
||||
command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters.")
|
||||
|
||||
_ = command.MarkFlagRequired("volume-path")
|
||||
_ = command.MarkFlagRequired("pod-volume-backup")
|
||||
_ = command.MarkFlagRequired("resource-timeout")
|
||||
|
||||
return command
|
||||
}
|
||||
|
||||
type podVolumeBackup struct {
|
||||
logger logrus.FieldLogger
|
||||
ctx context.Context
|
||||
cancelFunc context.CancelFunc
|
||||
client ctlclient.Client
|
||||
cache ctlcache.Cache
|
||||
namespace string
|
||||
nodeName string
|
||||
config podVolumeBackupConfig
|
||||
kubeClient kubernetes.Interface
|
||||
dataPathMgr *datapath.Manager
|
||||
}
|
||||
|
||||
func newPodVolumeBackup(logger logrus.FieldLogger, factory client.Factory, config podVolumeBackupConfig) (*podVolumeBackup, error) {
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
|
||||
clientConfig, err := factory.ClientConfig()
|
||||
if err != nil {
|
||||
cancelFunc()
|
||||
return nil, errors.Wrap(err, "error to create client config")
|
||||
}
|
||||
|
||||
ctrl.SetLogger(logrusr.New(logger))
|
||||
klog.SetLogger(logrusr.New(logger)) // klog.Logger is used by k8s.io/client-go
|
||||
|
||||
scheme := runtime.NewScheme()
|
||||
if err := velerov1api.AddToScheme(scheme); err != nil {
|
||||
cancelFunc()
|
||||
return nil, errors.Wrap(err, "error to add velero v1 scheme")
|
||||
}
|
||||
|
||||
if err := corev1api.AddToScheme(scheme); err != nil {
|
||||
cancelFunc()
|
||||
return nil, errors.Wrap(err, "error to add core v1 scheme")
|
||||
}
|
||||
|
||||
nodeName := os.Getenv("NODE_NAME")
|
||||
|
||||
// use a field selector to filter to only pods scheduled on this node.
|
||||
cacheOption := ctlcache.Options{
|
||||
Scheme: scheme,
|
||||
ByObject: map[ctlclient.Object]ctlcache.ByObject{
|
||||
&corev1api.Pod{}: {
|
||||
Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(),
|
||||
},
|
||||
&velerov1api.PodVolumeBackup{}: {
|
||||
Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
cli, err := ctlclient.New(clientConfig, ctlclient.Options{
|
||||
Scheme: scheme,
|
||||
})
|
||||
if err != nil {
|
||||
cancelFunc()
|
||||
return nil, errors.Wrap(err, "error to create client")
|
||||
}
|
||||
|
||||
var cache ctlcache.Cache
|
||||
retry := 10
|
||||
for {
|
||||
cache, err = ctlcache.New(clientConfig, cacheOption)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
retry--
|
||||
if retry == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
logger.WithError(err).Warn("Failed to create client cache, need retry")
|
||||
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
cancelFunc()
|
||||
return nil, errors.Wrap(err, "error to create client cache")
|
||||
}
|
||||
|
||||
s := &podVolumeBackup{
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
cancelFunc: cancelFunc,
|
||||
client: cli,
|
||||
cache: cache,
|
||||
config: config,
|
||||
namespace: factory.Namespace(),
|
||||
nodeName: nodeName,
|
||||
}
|
||||
|
||||
s.kubeClient, err = factory.KubeClient()
|
||||
if err != nil {
|
||||
cancelFunc()
|
||||
return nil, errors.Wrap(err, "error to create kube client")
|
||||
}
|
||||
|
||||
s.dataPathMgr = datapath.NewManager(1)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
var funcExitWithMessage = kube.ExitPodWithMessage
|
||||
var funcCreateDataPathService = (*podVolumeBackup).createDataPathService
|
||||
|
||||
func (s *podVolumeBackup) run() {
|
||||
signals.CancelOnShutdown(s.cancelFunc, s.logger)
|
||||
go func() {
|
||||
if err := s.cache.Start(s.ctx); err != nil {
|
||||
s.logger.WithError(err).Warn("error starting cache")
|
||||
}
|
||||
}()
|
||||
|
||||
s.runDataPath()
|
||||
}
|
||||
|
||||
func (s *podVolumeBackup) runDataPath() {
|
||||
s.logger.Infof("Starting micro service in node %s for PVB %s", s.nodeName, s.config.pvbName)
|
||||
|
||||
dpService, err := funcCreateDataPathService(s)
|
||||
if err != nil {
|
||||
s.cancelFunc()
|
||||
funcExitWithMessage(s.logger, false, "Failed to create data path service for PVB %s: %v", s.config.pvbName, err)
|
||||
return
|
||||
}
|
||||
|
||||
s.logger.Infof("Starting data path service %s", s.config.pvbName)
|
||||
|
||||
err = dpService.Init()
|
||||
if err != nil {
|
||||
dpService.Shutdown()
|
||||
s.cancelFunc()
|
||||
funcExitWithMessage(s.logger, false, "Failed to init data path service for PVB %s: %v", s.config.pvbName, err)
|
||||
return
|
||||
}
|
||||
|
||||
s.logger.Infof("Running data path service %s", s.config.pvbName)
|
||||
|
||||
result, err := dpService.RunCancelableDataPath(s.ctx)
|
||||
if err != nil {
|
||||
dpService.Shutdown()
|
||||
s.cancelFunc()
|
||||
funcExitWithMessage(s.logger, false, "Failed to run data path service for PVB %s: %v", s.config.pvbName, err)
|
||||
return
|
||||
}
|
||||
|
||||
s.logger.WithField("PVB", s.config.pvbName).Info("Data path service completed")
|
||||
|
||||
dpService.Shutdown()
|
||||
|
||||
s.logger.WithField("PVB", s.config.pvbName).Info("Data path service is shut down")
|
||||
|
||||
s.cancelFunc()
|
||||
|
||||
funcExitWithMessage(s.logger, true, result)
|
||||
}
|
||||
|
||||
var funcNewCredentialFileStore = credentials.NewNamespacedFileStore
|
||||
var funcNewCredentialSecretStore = credentials.NewNamespacedSecretStore
|
||||
|
||||
func (s *podVolumeBackup) createDataPathService() (dataPathService, error) {
|
||||
credentialFileStore, err := funcNewCredentialFileStore(
|
||||
s.client,
|
||||
s.namespace,
|
||||
credentials.DefaultStoreDirectory(),
|
||||
filesystem.NewFileSystem(),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error to create credential file store")
|
||||
}
|
||||
|
||||
credSecretStore, err := funcNewCredentialSecretStore(s.client, s.namespace)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error to create credential secret store")
|
||||
}
|
||||
|
||||
credGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
|
||||
|
||||
pvbInformer, err := s.cache.GetInformer(s.ctx, &velerov1api.PodVolumeBackup{})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error to get controller-runtime informer from manager")
|
||||
}
|
||||
|
||||
repoEnsurer := repository.NewEnsurer(s.client, s.logger, s.config.resourceTimeout)
|
||||
|
||||
return podvolume.NewBackupMicroService(s.ctx, s.client, s.kubeClient, s.config.pvbName, s.namespace, s.nodeName, datapath.AccessPoint{
|
||||
ByPath: s.config.volumePath,
|
||||
VolMode: uploader.PersistentVolumeFilesystem,
|
||||
}, s.dataPathMgr, repoEnsurer, credGetter, pvbInformer, s.logger), nil
|
||||
}
|
|
@ -0,0 +1,216 @@
|
|||
/*
|
||||
Copyright The Velero Contributors.
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podvolume
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
cacheMock "github.com/vmware-tanzu/velero/pkg/cmd/cli/datamover/mocks"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
)
|
||||
|
||||
func fakeCreateDataPathServiceWithErr(_ *podVolumeBackup) (dataPathService, error) {
|
||||
return nil, errors.New("fake-create-data-path-error")
|
||||
}
|
||||
|
||||
var frHelper *fakeRunHelper
|
||||
|
||||
func fakeCreateDataPathService(_ *podVolumeBackup) (dataPathService, error) {
|
||||
return frHelper, nil
|
||||
}
|
||||
|
||||
type fakeRunHelper struct {
|
||||
initErr error
|
||||
runCancelableDataPathErr error
|
||||
runCancelableDataPathResult string
|
||||
exitMessage string
|
||||
succeed bool
|
||||
}
|
||||
|
||||
func (fr *fakeRunHelper) Init() error {
|
||||
return fr.initErr
|
||||
}
|
||||
|
||||
func (fr *fakeRunHelper) RunCancelableDataPath(_ context.Context) (string, error) {
|
||||
if fr.runCancelableDataPathErr != nil {
|
||||
return "", fr.runCancelableDataPathErr
|
||||
} else {
|
||||
return fr.runCancelableDataPathResult, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (fr *fakeRunHelper) Shutdown() {
|
||||
|
||||
}
|
||||
|
||||
func (fr *fakeRunHelper) ExitWithMessage(logger logrus.FieldLogger, succeed bool, message string, a ...any) {
|
||||
fr.succeed = succeed
|
||||
fr.exitMessage = fmt.Sprintf(message, a...)
|
||||
}
|
||||
|
||||
func TestRunDataPath(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
pvbName string
|
||||
createDataPathFail bool
|
||||
initDataPathErr error
|
||||
runCancelableDataPathErr error
|
||||
runCancelableDataPathResult string
|
||||
expectedMessage string
|
||||
expectedSucceed bool
|
||||
}{
|
||||
{
|
||||
name: "create data path failed",
|
||||
pvbName: "fake-name",
|
||||
createDataPathFail: true,
|
||||
expectedMessage: "Failed to create data path service for PVB fake-name: fake-create-data-path-error",
|
||||
},
|
||||
{
|
||||
name: "init data path failed",
|
||||
pvbName: "fake-name",
|
||||
initDataPathErr: errors.New("fake-init-data-path-error"),
|
||||
expectedMessage: "Failed to init data path service for PVB fake-name: fake-init-data-path-error",
|
||||
},
|
||||
{
|
||||
name: "run data path failed",
|
||||
pvbName: "fake-name",
|
||||
runCancelableDataPathErr: errors.New("fake-run-data-path-error"),
|
||||
expectedMessage: "Failed to run data path service for PVB fake-name: fake-run-data-path-error",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
pvbName: "fake-name",
|
||||
runCancelableDataPathResult: "fake-run-data-path-result",
|
||||
expectedMessage: "fake-run-data-path-result",
|
||||
expectedSucceed: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
frHelper = &fakeRunHelper{
|
||||
initErr: test.initDataPathErr,
|
||||
runCancelableDataPathErr: test.runCancelableDataPathErr,
|
||||
runCancelableDataPathResult: test.runCancelableDataPathResult,
|
||||
}
|
||||
|
||||
if test.createDataPathFail {
|
||||
funcCreateDataPathService = fakeCreateDataPathServiceWithErr
|
||||
} else {
|
||||
funcCreateDataPathService = fakeCreateDataPathService
|
||||
}
|
||||
|
||||
funcExitWithMessage = frHelper.ExitWithMessage
|
||||
|
||||
s := &podVolumeBackup{
|
||||
logger: velerotest.NewLogger(),
|
||||
cancelFunc: func() {},
|
||||
config: podVolumeBackupConfig{
|
||||
pvbName: test.pvbName,
|
||||
},
|
||||
}
|
||||
|
||||
s.runDataPath()
|
||||
|
||||
assert.Equal(t, test.expectedMessage, frHelper.exitMessage)
|
||||
assert.Equal(t, test.expectedSucceed, frHelper.succeed)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeCreateDataPathServiceHelper struct {
|
||||
fileStoreErr error
|
||||
secretStoreErr error
|
||||
}
|
||||
|
||||
func (fc *fakeCreateDataPathServiceHelper) NewNamespacedFileStore(_ ctlclient.Client, _ string, _ string, _ filesystem.Interface) (credentials.FileStore, error) {
|
||||
return nil, fc.fileStoreErr
|
||||
}
|
||||
|
||||
func (fc *fakeCreateDataPathServiceHelper) NewNamespacedSecretStore(_ ctlclient.Client, _ string) (credentials.SecretStore, error) {
|
||||
return nil, fc.secretStoreErr
|
||||
}
|
||||
|
||||
func TestCreateDataPathService(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
fileStoreErr error
|
||||
secretStoreErr error
|
||||
mockGetInformer bool
|
||||
getInformerErr error
|
||||
expectedError string
|
||||
}{
|
||||
{
|
||||
name: "create credential file store error",
|
||||
fileStoreErr: errors.New("fake-file-store-error"),
|
||||
expectedError: "error to create credential file store: fake-file-store-error",
|
||||
},
|
||||
{
|
||||
name: "create credential secret store",
|
||||
secretStoreErr: errors.New("fake-secret-store-error"),
|
||||
expectedError: "error to create credential secret store: fake-secret-store-error",
|
||||
},
|
||||
{
|
||||
name: "get informer error",
|
||||
mockGetInformer: true,
|
||||
getInformerErr: errors.New("fake-get-informer-error"),
|
||||
expectedError: "error to get controller-runtime informer from manager: fake-get-informer-error",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
mockGetInformer: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
fcHelper := &fakeCreateDataPathServiceHelper{
|
||||
fileStoreErr: test.fileStoreErr,
|
||||
secretStoreErr: test.secretStoreErr,
|
||||
}
|
||||
|
||||
funcNewCredentialFileStore = fcHelper.NewNamespacedFileStore
|
||||
funcNewCredentialSecretStore = fcHelper.NewNamespacedSecretStore
|
||||
|
||||
cache := cacheMock.NewCache(t)
|
||||
if test.mockGetInformer {
|
||||
cache.On("GetInformer", mock.Anything, mock.Anything).Return(nil, test.getInformerErr)
|
||||
}
|
||||
|
||||
funcExitWithMessage = frHelper.ExitWithMessage
|
||||
|
||||
s := &podVolumeBackup{
|
||||
cache: cache,
|
||||
}
|
||||
|
||||
_, err := s.createDataPathService()
|
||||
|
||||
if test.expectedError != "" {
|
||||
assert.EqualError(t, err, test.expectedError)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
Copyright The Velero Contributors.
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podvolume
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/client"
|
||||
)
|
||||
|
||||
func NewCommand(f client.Factory) *cobra.Command {
|
||||
command := &cobra.Command{
|
||||
Use: "pod-volume",
|
||||
Short: "Run the velero pod volume backup/restore",
|
||||
Long: "Run the velero pod volume backup/restore",
|
||||
Hidden: true,
|
||||
}
|
||||
|
||||
command.AddCommand(
|
||||
NewBackupCommand(f),
|
||||
)
|
||||
|
||||
return command
|
||||
}
|
||||
|
||||
type dataPathService interface {
|
||||
Init() error
|
||||
RunCancelableDataPath(context.Context) (string, error)
|
||||
Shutdown()
|
||||
}
|
|
@ -121,7 +121,7 @@ func initRepoManager(namespace string, cli client.Client, kubeClient kubernetes.
|
|||
credentialFileStore, err := credentials.NewNamespacedFileStore(
|
||||
cli,
|
||||
namespace,
|
||||
"/tmp/credentials",
|
||||
credentials.DefaultStoreDirectory(),
|
||||
filesystem.NewFileSystem(),
|
||||
)
|
||||
if err != nil {
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
"github.com/vmware-tanzu/velero/pkg/cmd/util/flag"
|
||||
"github.com/vmware-tanzu/velero/pkg/constant"
|
||||
podvolumeconfigs "github.com/vmware-tanzu/velero/pkg/podvolume/configs"
|
||||
|
@ -47,10 +48,6 @@ const (
|
|||
defaultMaxConcurrentK8SConnections = 30
|
||||
defaultDisableInformerCache = false
|
||||
|
||||
// defaultCredentialsDirectory is the path on disk where credential
|
||||
// files will be written to
|
||||
defaultCredentialsDirectory = "/tmp/credentials"
|
||||
|
||||
DefaultKeepLatestMaintenanceJobs = 3
|
||||
DefaultMaintenanceJobCPURequest = "0"
|
||||
DefaultMaintenanceJobCPULimit = "0"
|
||||
|
@ -216,7 +213,7 @@ func GetDefaultConfig() *Config {
|
|||
DefaultSnapshotMoveData: false,
|
||||
DisableInformerCache: defaultDisableInformerCache,
|
||||
ScheduleSkipImmediately: false,
|
||||
CredentialsDirectory: defaultCredentialsDirectory,
|
||||
CredentialsDirectory: credentials.DefaultStoreDirectory(),
|
||||
PodResources: kube.PodResources{
|
||||
CPURequest: DefaultMaintenanceJobCPULimit,
|
||||
CPULimit: DefaultMaintenanceJobCPURequest,
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/cmd/cli/debug"
|
||||
"github.com/vmware-tanzu/velero/pkg/cmd/cli/podvolume"
|
||||
"github.com/vmware-tanzu/velero/pkg/cmd/cli/repomantenance"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/client"
|
||||
|
@ -126,6 +127,7 @@ operations can also be performed as 'velero backup get' and 'velero schedule cre
|
|||
debug.NewCommand(f),
|
||||
repomantenance.NewCommand(f),
|
||||
datamover.NewCommand(f),
|
||||
podvolume.NewCommand(f),
|
||||
)
|
||||
|
||||
// init and add the klog flags
|
||||
|
|
|
@ -56,6 +56,7 @@ import (
|
|||
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/logging"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/results"
|
||||
veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -424,6 +425,13 @@ func (b *backupReconciler) prepareBackupRequest(backup *velerov1api.Backup, logg
|
|||
request.Status.ValidationErrors = append(request.Status.ValidationErrors,
|
||||
fmt.Sprintf("backup can't be created because backup storage location %s is currently in read-only mode", request.StorageLocation.Name))
|
||||
}
|
||||
|
||||
if !veleroutil.BSLIsAvailable(*request.StorageLocation) {
|
||||
request.Status.ValidationErrors = append(
|
||||
request.Status.ValidationErrors,
|
||||
fmt.Sprintf("backup can't be created because BackupStorageLocation %s is in Unavailable status.", request.StorageLocation.Name),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// add the storage location as a label for easy filtering later.
|
||||
|
|
|
@ -155,7 +155,7 @@ func TestProcessBackupNonProcessedItems(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestProcessBackupValidationFailures(t *testing.T) {
|
||||
defaultBackupLocation := builder.ForBackupStorageLocation("velero", "loc-1").Result()
|
||||
defaultBackupLocation := builder.ForBackupStorageLocation("velero", "loc-1").Phase(velerov1api.BackupStorageLocationPhaseAvailable).Result()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
|
@ -183,7 +183,7 @@ func TestProcessBackupValidationFailures(t *testing.T) {
|
|||
{
|
||||
name: "backup for read-only backup location fails validation",
|
||||
backup: defaultBackup().StorageLocation("read-only").Result(),
|
||||
backupLocation: builder.ForBackupStorageLocation("velero", "read-only").AccessMode(velerov1api.BackupStorageLocationAccessModeReadOnly).Result(),
|
||||
backupLocation: builder.ForBackupStorageLocation("velero", "read-only").AccessMode(velerov1api.BackupStorageLocationAccessModeReadOnly).Phase(velerov1api.BackupStorageLocationPhaseAvailable).Result(),
|
||||
expectedErrs: []string{"backup can't be created because backup storage location read-only is currently in read-only mode"},
|
||||
},
|
||||
{
|
||||
|
@ -203,6 +203,12 @@ func TestProcessBackupValidationFailures(t *testing.T) {
|
|||
backupLocation: defaultBackupLocation,
|
||||
expectedErrs: []string{"include-resources, exclude-resources and include-cluster-resources are old filter parameters.\ninclude-cluster-scoped-resources, exclude-cluster-scoped-resources, include-namespace-scoped-resources and exclude-namespace-scoped-resources are new filter parameters.\nThey cannot be used together"},
|
||||
},
|
||||
{
|
||||
name: "BSL in unavailable state",
|
||||
backup: defaultBackup().StorageLocation("unavailable").Result(),
|
||||
backupLocation: builder.ForBackupStorageLocation("velero", "unavailable").Phase(velerov1api.BackupStorageLocationPhaseUnavailable).Result(),
|
||||
expectedErrs: []string{"backup can't be created because BackupStorageLocation unavailable is in Unavailable status."},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
@ -655,7 +661,7 @@ func TestDefaultVolumesToResticDeprecation(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestProcessBackupCompletions(t *testing.T) {
|
||||
defaultBackupLocation := builder.ForBackupStorageLocation("velero", "loc-1").Default(true).Bucket("store-1").Result()
|
||||
defaultBackupLocation := builder.ForBackupStorageLocation("velero", "loc-1").Default(true).Bucket("store-1").Phase(velerov1api.BackupStorageLocationPhaseAvailable).Result()
|
||||
|
||||
now, err := time.Parse(time.RFC1123Z, time.RFC1123Z)
|
||||
require.NoError(t, err)
|
||||
|
@ -715,7 +721,7 @@ func TestProcessBackupCompletions(t *testing.T) {
|
|||
{
|
||||
name: "backup with a specific backup location keeps it",
|
||||
backup: defaultBackup().StorageLocation("alt-loc").Result(),
|
||||
backupLocation: builder.ForBackupStorageLocation("velero", "alt-loc").Bucket("store-1").Result(),
|
||||
backupLocation: builder.ForBackupStorageLocation("velero", "alt-loc").Bucket("store-1").Phase(velerov1api.BackupStorageLocationPhaseAvailable).Result(),
|
||||
defaultVolumesToFsBackup: false,
|
||||
expectedResult: &velerov1api.Backup{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
|
@ -755,6 +761,7 @@ func TestProcessBackupCompletions(t *testing.T) {
|
|||
backupLocation: builder.ForBackupStorageLocation("velero", "read-write").
|
||||
Bucket("store-1").
|
||||
AccessMode(velerov1api.BackupStorageLocationAccessModeReadWrite).
|
||||
Phase(velerov1api.BackupStorageLocationPhaseAvailable).
|
||||
Result(),
|
||||
defaultVolumesToFsBackup: true,
|
||||
expectedResult: &velerov1api.Backup{
|
||||
|
@ -1477,11 +1484,13 @@ func TestProcessBackupCompletions(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestValidateAndGetSnapshotLocations(t *testing.T) {
|
||||
defaultBSL := builder.ForBackupStorageLocation(velerov1api.DefaultNamespace, "bsl").Phase(velerov1api.BackupStorageLocationPhaseAvailable).Result()
|
||||
tests := []struct {
|
||||
name string
|
||||
backup *velerov1api.Backup
|
||||
locations []*velerov1api.VolumeSnapshotLocation
|
||||
defaultLocations map[string]string
|
||||
bsl velerov1api.BackupStorageLocation
|
||||
expectedVolumeSnapshotLocationNames []string // adding these in the expected order will allow to test with better msgs in case of a test failure
|
||||
expectedErrors string
|
||||
expectedSuccess bool
|
||||
|
@ -1495,6 +1504,7 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) {
|
|||
builder.ForVolumeSnapshotLocation(velerov1api.DefaultNamespace, "some-name").Provider("fake-provider").Result(),
|
||||
},
|
||||
expectedErrors: "a VolumeSnapshotLocation CRD for the location random-name with the name specified in the backup spec needs to be created before this snapshot can be executed. Error: volumesnapshotlocations.velero.io \"random-name\" not found", expectedSuccess: false,
|
||||
bsl: *defaultBSL,
|
||||
},
|
||||
{
|
||||
name: "duplicate locationName per provider: should filter out dups",
|
||||
|
@ -1505,6 +1515,7 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) {
|
|||
},
|
||||
expectedVolumeSnapshotLocationNames: []string{"aws-us-west-1"},
|
||||
expectedSuccess: true,
|
||||
bsl: *defaultBSL,
|
||||
},
|
||||
{
|
||||
name: "multiple non-dupe location names per provider should error",
|
||||
|
@ -1516,6 +1527,7 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) {
|
|||
},
|
||||
expectedErrors: "more than one VolumeSnapshotLocation name specified for provider aws: aws-us-west-1; unexpected name was aws-us-east-1",
|
||||
expectedSuccess: false,
|
||||
bsl: *defaultBSL,
|
||||
},
|
||||
{
|
||||
name: "no location name for the provider exists, only one VSL for the provider: use it",
|
||||
|
@ -1525,6 +1537,7 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) {
|
|||
},
|
||||
expectedVolumeSnapshotLocationNames: []string{"aws-us-east-1"},
|
||||
expectedSuccess: true,
|
||||
bsl: *defaultBSL,
|
||||
},
|
||||
{
|
||||
name: "no location name for the provider exists, no default, more than one VSL for the provider: error",
|
||||
|
@ -1534,6 +1547,7 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) {
|
|||
builder.ForVolumeSnapshotLocation(velerov1api.DefaultNamespace, "aws-us-west-1").Provider("aws").Result(),
|
||||
},
|
||||
expectedErrors: "provider aws has more than one possible volume snapshot location, and none were specified explicitly or as a default",
|
||||
bsl: *defaultBSL,
|
||||
},
|
||||
{
|
||||
name: "no location name for the provider exists, more than one VSL for the provider: the provider's default should be added",
|
||||
|
@ -1545,11 +1559,13 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) {
|
|||
},
|
||||
expectedVolumeSnapshotLocationNames: []string{"aws-us-east-1"},
|
||||
expectedSuccess: true,
|
||||
bsl: *defaultBSL,
|
||||
},
|
||||
{
|
||||
name: "no existing location name and no default location name given",
|
||||
backup: defaultBackup().Phase(velerov1api.BackupPhaseNew).Result(),
|
||||
expectedSuccess: true,
|
||||
bsl: *defaultBSL,
|
||||
},
|
||||
{
|
||||
name: "multiple location names for a provider, default location name for another provider",
|
||||
|
@ -1561,6 +1577,7 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) {
|
|||
},
|
||||
expectedVolumeSnapshotLocationNames: []string{"aws-us-west-1", "some-name"},
|
||||
expectedSuccess: true,
|
||||
bsl: *defaultBSL,
|
||||
},
|
||||
{
|
||||
name: "location name does not correspond to any existing location and snapshotvolume disabled; should return error",
|
||||
|
@ -1572,6 +1589,7 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) {
|
|||
},
|
||||
expectedVolumeSnapshotLocationNames: nil,
|
||||
expectedErrors: "a VolumeSnapshotLocation CRD for the location random-name with the name specified in the backup spec needs to be created before this snapshot can be executed. Error: volumesnapshotlocations.velero.io \"random-name\" not found", expectedSuccess: false,
|
||||
bsl: *defaultBSL,
|
||||
},
|
||||
{
|
||||
name: "duplicate locationName per provider and snapshotvolume disabled; should return only one BSL",
|
||||
|
@ -1582,6 +1600,7 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) {
|
|||
},
|
||||
expectedVolumeSnapshotLocationNames: []string{"aws-us-west-1"},
|
||||
expectedSuccess: true,
|
||||
bsl: *defaultBSL,
|
||||
},
|
||||
{
|
||||
name: "no location name for the provider exists, only one VSL created and snapshotvolume disabled; should return the VSL",
|
||||
|
@ -1591,6 +1610,7 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) {
|
|||
},
|
||||
expectedVolumeSnapshotLocationNames: []string{"aws-us-east-1"},
|
||||
expectedSuccess: true,
|
||||
bsl: *defaultBSL,
|
||||
},
|
||||
{
|
||||
name: "multiple location names for a provider, no default location and backup has no location defined, but snapshotvolume disabled, should return error",
|
||||
|
@ -1601,6 +1621,7 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) {
|
|||
},
|
||||
expectedVolumeSnapshotLocationNames: nil,
|
||||
expectedErrors: "provider aws has more than one possible volume snapshot location, and none were specified explicitly or as a default",
|
||||
bsl: *defaultBSL,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -22,9 +22,8 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/util/csi"
|
||||
|
||||
jsonpatch "github.com/evanphx/json-patch/v5"
|
||||
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v7/apis/volumesnapshot/v1"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
|
@ -37,8 +36,6 @@ import (
|
|||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v7/apis/volumesnapshot/v1"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
"github.com/vmware-tanzu/velero/internal/delete"
|
||||
"github.com/vmware-tanzu/velero/internal/volume"
|
||||
|
@ -56,8 +53,10 @@ import (
|
|||
repomanager "github.com/vmware-tanzu/velero/pkg/repository/manager"
|
||||
repotypes "github.com/vmware-tanzu/velero/pkg/repository/types"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/csi"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -202,6 +201,11 @@ func (r *backupDeletionReconciler) Reconcile(ctx context.Context, req ctrl.Reque
|
|||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
if !veleroutil.BSLIsAvailable(*location) {
|
||||
err := r.patchDeleteBackupRequestWithError(ctx, dbr, fmt.Errorf("cannot delete backup because backup storage location %s is currently in Unavailable state", location.Name))
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// if the request object has no labels defined, initialize an empty map since
|
||||
// we will be updating labels
|
||||
if dbr.Labels == nil {
|
||||
|
|
|
@ -126,6 +126,9 @@ func TestBackupDeletionControllerReconcile(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
Status: velerov1api.BackupStorageLocationStatus{
|
||||
Phase: velerov1api.BackupStorageLocationPhaseAvailable,
|
||||
},
|
||||
}
|
||||
dbr := defaultTestDbr()
|
||||
td := setupBackupDeletionControllerTest(t, dbr, location, backup)
|
||||
|
@ -254,7 +257,7 @@ func TestBackupDeletionControllerReconcile(t *testing.T) {
|
|||
|
||||
t.Run("backup storage location is in read-only mode", func(t *testing.T) {
|
||||
backup := builder.ForBackup(velerov1api.DefaultNamespace, "foo").StorageLocation("default").Result()
|
||||
location := builder.ForBackupStorageLocation("velero", "default").AccessMode(velerov1api.BackupStorageLocationAccessModeReadOnly).Result()
|
||||
location := builder.ForBackupStorageLocation("velero", "default").Phase(velerov1api.BackupStorageLocationPhaseAvailable).AccessMode(velerov1api.BackupStorageLocationAccessModeReadOnly).Result()
|
||||
|
||||
td := setupBackupDeletionControllerTest(t, defaultTestDbr(), location, backup)
|
||||
|
||||
|
@ -268,6 +271,24 @@ func TestBackupDeletionControllerReconcile(t *testing.T) {
|
|||
assert.Len(t, res.Status.Errors, 1)
|
||||
assert.Equal(t, "cannot delete backup because backup storage location default is currently in read-only mode", res.Status.Errors[0])
|
||||
})
|
||||
|
||||
t.Run("backup storage location is in unavailable state", func(t *testing.T) {
|
||||
backup := builder.ForBackup(velerov1api.DefaultNamespace, "foo").StorageLocation("default").Result()
|
||||
location := builder.ForBackupStorageLocation("velero", "default").Phase(velerov1api.BackupStorageLocationPhaseUnavailable).Result()
|
||||
|
||||
td := setupBackupDeletionControllerTest(t, defaultTestDbr(), location, backup)
|
||||
|
||||
_, err := td.controller.Reconcile(context.TODO(), td.req)
|
||||
require.NoError(t, err)
|
||||
|
||||
res := &velerov1api.DeleteBackupRequest{}
|
||||
err = td.fakeClient.Get(ctx, td.req.NamespacedName, res)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "Processed", string(res.Status.Phase))
|
||||
assert.Len(t, res.Status.Errors, 1)
|
||||
assert.Equal(t, "cannot delete backup because backup storage location default is currently in Unavailable state", res.Status.Errors[0])
|
||||
})
|
||||
|
||||
t.Run("full delete, no errors", func(t *testing.T) {
|
||||
input := defaultTestDbr()
|
||||
|
||||
|
@ -297,6 +318,9 @@ func TestBackupDeletionControllerReconcile(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
Status: velerov1api.BackupStorageLocationStatus{
|
||||
Phase: velerov1api.BackupStorageLocationPhaseAvailable,
|
||||
},
|
||||
}
|
||||
|
||||
snapshotLocation := &velerov1api.VolumeSnapshotLocation{
|
||||
|
@ -416,6 +440,9 @@ func TestBackupDeletionControllerReconcile(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
Status: velerov1api.BackupStorageLocationStatus{
|
||||
Phase: velerov1api.BackupStorageLocationPhaseAvailable,
|
||||
},
|
||||
}
|
||||
|
||||
snapshotLocation := &velerov1api.VolumeSnapshotLocation{
|
||||
|
@ -518,6 +545,9 @@ func TestBackupDeletionControllerReconcile(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
Status: velerov1api.BackupStorageLocationStatus{
|
||||
Phase: velerov1api.BackupStorageLocationPhaseAvailable,
|
||||
},
|
||||
}
|
||||
|
||||
snapshotLocation := &velerov1api.VolumeSnapshotLocation{
|
||||
|
@ -600,6 +630,9 @@ func TestBackupDeletionControllerReconcile(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
Status: velerov1api.BackupStorageLocationStatus{
|
||||
Phase: velerov1api.BackupStorageLocationPhaseAvailable,
|
||||
},
|
||||
}
|
||||
|
||||
snapshotLocation := &velerov1api.VolumeSnapshotLocation{
|
||||
|
|
|
@ -41,6 +41,7 @@ import (
|
|||
"github.com/vmware-tanzu/velero/pkg/persistence"
|
||||
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero"
|
||||
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
@ -92,6 +93,10 @@ func (b *backupSyncReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
|||
}
|
||||
return ctrl.Result{}, errors.Wrapf(err, "error getting BackupStorageLocation %s", req.String())
|
||||
}
|
||||
if !veleroutil.BSLIsAvailable(*location) {
|
||||
log.Errorf("BackupStorageLocation is in unavailable state, skip syncing backup from it.")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
pluginManager := b.newPluginManager(log)
|
||||
defer pluginManager.CleanupClients()
|
||||
|
|
|
@ -62,6 +62,9 @@ func defaultLocation(namespace string) *velerov1api.BackupStorageLocation {
|
|||
},
|
||||
Default: true,
|
||||
},
|
||||
Status: velerov1api.BackupStorageLocationStatus{
|
||||
Phase: velerov1api.BackupStorageLocationPhaseAvailable,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -141,6 +144,9 @@ func defaultLocationWithLongerLocationName(namespace string) *velerov1api.Backup
|
|||
},
|
||||
},
|
||||
},
|
||||
Status: velerov1api.BackupStorageLocationStatus{
|
||||
Phase: velerov1api.BackupStorageLocationPhaseAvailable,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -177,6 +183,21 @@ var _ = Describe("Backup Sync Reconciler", func() {
|
|||
namespace: "ns-1",
|
||||
location: defaultLocation("ns-1"),
|
||||
},
|
||||
{
|
||||
name: "unavailable BSL",
|
||||
namespace: "ns-1",
|
||||
location: builder.ForBackupStorageLocation("ns-1", "default").Phase(velerov1api.BackupStorageLocationPhaseUnavailable).Result(),
|
||||
cloudBackups: []*cloudBackupData{
|
||||
{
|
||||
backup: builder.ForBackup("ns-1", "backup-1").Result(),
|
||||
backupShouldSkipSync: true,
|
||||
},
|
||||
{
|
||||
backup: builder.ForBackup("ns-1", "backup-2").Result(),
|
||||
backupShouldSkipSync: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "normal case",
|
||||
namespace: "ns-1",
|
||||
|
|
|
@ -18,6 +18,7 @@ package controller
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
@ -36,6 +37,7 @@ import (
|
|||
"github.com/vmware-tanzu/velero/pkg/constant"
|
||||
"github.com/vmware-tanzu/velero/pkg/label"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -44,6 +46,7 @@ const (
|
|||
gcFailureBSLNotFound = "BSLNotFound"
|
||||
gcFailureBSLCannotGet = "BSLCannotGet"
|
||||
gcFailureBSLReadOnly = "BSLReadOnly"
|
||||
gcFailureBSLUnavailable = "BSLUnavailable"
|
||||
)
|
||||
|
||||
// gcReconciler creates DeleteBackupRequests for expired backups.
|
||||
|
@ -144,12 +147,18 @@ func (c *gcReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Re
|
|||
} else {
|
||||
backup.Labels[garbageCollectionFailure] = gcFailureBSLCannotGet
|
||||
}
|
||||
|
||||
if err := c.Update(ctx, backup); err != nil {
|
||||
log.WithError(err).Error("error updating backup labels")
|
||||
}
|
||||
return ctrl.Result{}, errors.Wrap(err, "error getting backup storage location")
|
||||
}
|
||||
|
||||
if !veleroutil.BSLIsAvailable(*loc) {
|
||||
log.Infof("BSL %s is unavailable, cannot gc backup", loc.Name)
|
||||
return ctrl.Result{}, fmt.Errorf("bsl %s is unavailable, cannot gc backup", loc.Name)
|
||||
}
|
||||
|
||||
if loc.Spec.AccessMode == velerov1api.BackupStorageLocationAccessModeReadOnly {
|
||||
log.Infof("Backup cannot be garbage-collected because backup storage location %s is currently in read-only mode", loc.Name)
|
||||
backup.Labels[garbageCollectionFailure] = gcFailureBSLReadOnly
|
||||
|
|
|
@ -46,7 +46,7 @@ func mockGCReconciler(fakeClient kbclient.Client, fakeClock *testclocks.FakeCloc
|
|||
|
||||
func TestGCReconcile(t *testing.T) {
|
||||
fakeClock := testclocks.NewFakeClock(time.Now())
|
||||
defaultBackupLocation := builder.ForBackupStorageLocation(velerov1api.DefaultNamespace, "default").Result()
|
||||
defaultBackupLocation := builder.ForBackupStorageLocation(velerov1api.DefaultNamespace, "default").Phase(velerov1api.BackupStorageLocationPhaseAvailable).Result()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
|
@ -66,12 +66,12 @@ func TestGCReconcile(t *testing.T) {
|
|||
{
|
||||
name: "expired backup in read-only storage location is not deleted",
|
||||
backup: defaultBackup().Expiration(fakeClock.Now().Add(-time.Minute)).StorageLocation("read-only").Result(),
|
||||
backupLocation: builder.ForBackupStorageLocation("velero", "read-only").AccessMode(velerov1api.BackupStorageLocationAccessModeReadOnly).Result(),
|
||||
backupLocation: builder.ForBackupStorageLocation("velero", "read-only").AccessMode(velerov1api.BackupStorageLocationAccessModeReadOnly).Phase(velerov1api.BackupStorageLocationPhaseAvailable).Result(),
|
||||
},
|
||||
{
|
||||
name: "expired backup in read-write storage location is deleted",
|
||||
backup: defaultBackup().Expiration(fakeClock.Now().Add(-time.Minute)).StorageLocation("read-write").Result(),
|
||||
backupLocation: builder.ForBackupStorageLocation("velero", "read-write").AccessMode(velerov1api.BackupStorageLocationAccessModeReadWrite).Result(),
|
||||
backupLocation: builder.ForBackupStorageLocation("velero", "read-write").AccessMode(velerov1api.BackupStorageLocationAccessModeReadWrite).Phase(velerov1api.BackupStorageLocationPhaseAvailable).Result(),
|
||||
},
|
||||
{
|
||||
name: "expired backup with no pending deletion requests is deleted",
|
||||
|
@ -118,6 +118,12 @@ func TestGCReconcile(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "BSL is unavailable",
|
||||
backup: defaultBackup().Expiration(fakeClock.Now().Add(-time.Second)).StorageLocation("default").Result(),
|
||||
backupLocation: builder.ForBackupStorageLocation(velerov1api.DefaultNamespace, "default").Phase(velerov1api.BackupStorageLocationPhaseUnavailable).Result(),
|
||||
expectError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
|
|
@ -58,6 +58,7 @@ import (
|
|||
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/logging"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/results"
|
||||
veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero"
|
||||
pkgrestoreUtil "github.com/vmware-tanzu/velero/pkg/util/velero/restore"
|
||||
)
|
||||
|
||||
|
@ -393,6 +394,11 @@ func (r *restoreReconciler) validateAndComplete(restore *api.Restore) (backupInf
|
|||
return backupInfo{}, nil
|
||||
}
|
||||
|
||||
if !veleroutil.BSLIsAvailable(*info.location) {
|
||||
restore.Status.ValidationErrors = append(restore.Status.ValidationErrors, fmt.Sprintf("The BSL %s is unavailable, cannot retrieve the backup", info.location.Name))
|
||||
return backupInfo{}, nil
|
||||
}
|
||||
|
||||
// Fill in the ScheduleName so it's easier to consume for metrics.
|
||||
if restore.Spec.ScheduleName == "" {
|
||||
restore.Spec.ScheduleName = info.backup.GetLabels()[api.ScheduleNameLabel]
|
||||
|
@ -728,6 +734,10 @@ func (r *restoreReconciler) deleteExternalResources(restore *api.Restore) error
|
|||
return errors.Wrap(err, fmt.Sprintf("can't get backup info, backup: %s", restore.Spec.BackupName))
|
||||
}
|
||||
|
||||
if !veleroutil.BSLIsAvailable(*backupInfo.location) {
|
||||
return fmt.Errorf("bsl %s is unavailable, cannot get the backup info", backupInfo.location.Name)
|
||||
}
|
||||
|
||||
// delete restore files in object storage
|
||||
pluginManager := r.newPluginManager(r.logger)
|
||||
defer pluginManager.CleanupClients()
|
||||
|
|
|
@ -66,7 +66,7 @@ func TestFetchBackupInfo(t *testing.T) {
|
|||
{
|
||||
name: "lister has backup",
|
||||
backupName: "backup-1",
|
||||
informerLocations: []*velerov1api.BackupStorageLocation{builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Result()},
|
||||
informerLocations: []*velerov1api.BackupStorageLocation{builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Phase(velerov1api.BackupStorageLocationPhaseAvailable).Result()},
|
||||
informerBackups: []*velerov1api.Backup{defaultBackup().StorageLocation("default").Result()},
|
||||
expectedRes: defaultBackup().StorageLocation("default").Result(),
|
||||
},
|
||||
|
@ -74,7 +74,7 @@ func TestFetchBackupInfo(t *testing.T) {
|
|||
name: "lister does not have a backup, but backupSvc does",
|
||||
backupName: "backup-1",
|
||||
backupStoreBackup: defaultBackup().StorageLocation("default").Result(),
|
||||
informerLocations: []*velerov1api.BackupStorageLocation{builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Result()},
|
||||
informerLocations: []*velerov1api.BackupStorageLocation{builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Phase(velerov1api.BackupStorageLocationPhaseAvailable).Result()},
|
||||
informerBackups: []*velerov1api.Backup{defaultBackup().StorageLocation("default").Result()},
|
||||
expectedRes: defaultBackup().StorageLocation("default").Result(),
|
||||
},
|
||||
|
@ -211,7 +211,7 @@ func TestProcessQueueItemSkips(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRestoreReconcile(t *testing.T) {
|
||||
defaultStorageLocation := builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Result()
|
||||
defaultStorageLocation := builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Phase(velerov1api.BackupStorageLocationPhaseAvailable).Result()
|
||||
|
||||
now, err := time.Parse(time.RFC1123Z, time.RFC1123Z)
|
||||
require.NoError(t, err)
|
||||
|
@ -464,6 +464,22 @@ func TestRestoreReconcile(t *testing.T) {
|
|||
expectedCompletedTime: ×tamp,
|
||||
expectedRestorerCall: NewRestore("foo", "bar", "backup-1", "ns-1", "", velerov1api.RestorePhaseInProgress).Result(),
|
||||
},
|
||||
{
|
||||
name: "Restore creation is rejected when BSL is unavailable",
|
||||
location: builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Phase(velerov1api.BackupStorageLocationPhaseUnavailable).Result(),
|
||||
restore: NewRestore("foo", "bar", "backup-1", "ns-1", "", velerov1api.RestorePhaseNew).Result(),
|
||||
backup: defaultBackup().StorageLocation("default").Result(),
|
||||
expectedErr: false,
|
||||
expectedPhase: string(velerov1api.RestorePhaseNew),
|
||||
expectedValidationErrors: []string{"The BSL default is unavailable, cannot retrieve the backup"},
|
||||
},
|
||||
{
|
||||
name: "Restore deletion is rejected when BSL is unavailable.",
|
||||
location: builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Phase(velerov1api.BackupStorageLocationPhaseUnavailable).Result(),
|
||||
restore: NewRestore("foo", "bar", "backup-1", "ns-1", "", velerov1api.RestorePhaseCompleted).ObjectMeta(builder.WithFinalizers(ExternalResourcesFinalizer), builder.WithDeletionTimestamp(timestamp.Time)).Result(),
|
||||
backup: defaultBackup().StorageLocation("default").Result(),
|
||||
expectedErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
formatFlag := logging.FormatText
|
||||
|
@ -738,7 +754,7 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
|
|||
Result(),
|
||||
))
|
||||
|
||||
location := builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Result()
|
||||
location := builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Phase(velerov1api.BackupStorageLocationPhaseAvailable).Result()
|
||||
require.NoError(t, r.kbClient.Create(context.Background(), location))
|
||||
|
||||
restore = &velerov1api.Restore{
|
||||
|
@ -797,7 +813,7 @@ func TestValidateAndCompleteWithResourceModifierSpecified(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
location := builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Result()
|
||||
location := builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Phase(velerov1api.BackupStorageLocationPhaseAvailable).Result()
|
||||
require.NoError(t, r.kbClient.Create(context.Background(), location))
|
||||
|
||||
require.NoError(t, r.kbClient.Create(
|
||||
|
|
|
@ -0,0 +1,313 @@
|
|||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podvolume
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
cachetool "k8s.io/client-go/tools/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
podVolumeRequestor = "snapshot-pod-volume"
|
||||
)
|
||||
|
||||
// BackupMicroService process data mover backups inside the backup pod
|
||||
type BackupMicroService struct {
|
||||
ctx context.Context
|
||||
client client.Client
|
||||
kubeClient kubernetes.Interface
|
||||
repoEnsurer *repository.Ensurer
|
||||
credentialGetter *credentials.CredentialGetter
|
||||
logger logrus.FieldLogger
|
||||
dataPathMgr *datapath.Manager
|
||||
eventRecorder kube.EventRecorder
|
||||
|
||||
namespace string
|
||||
pvbName string
|
||||
pvb *velerov1api.PodVolumeBackup
|
||||
sourceTargetPath datapath.AccessPoint
|
||||
|
||||
resultSignal chan dataPathResult
|
||||
|
||||
pvbInformer cache.Informer
|
||||
pvbHandler cachetool.ResourceEventHandlerRegistration
|
||||
nodeName string
|
||||
}
|
||||
|
||||
type dataPathResult struct {
|
||||
err error
|
||||
result string
|
||||
}
|
||||
|
||||
func NewBackupMicroService(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, pvbName string, namespace string, nodeName string,
|
||||
sourceTargetPath datapath.AccessPoint, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, cred *credentials.CredentialGetter,
|
||||
pvbInformer cache.Informer, log logrus.FieldLogger) *BackupMicroService {
|
||||
return &BackupMicroService{
|
||||
ctx: ctx,
|
||||
client: client,
|
||||
kubeClient: kubeClient,
|
||||
credentialGetter: cred,
|
||||
logger: log,
|
||||
repoEnsurer: repoEnsurer,
|
||||
dataPathMgr: dataPathMgr,
|
||||
namespace: namespace,
|
||||
pvbName: pvbName,
|
||||
sourceTargetPath: sourceTargetPath,
|
||||
nodeName: nodeName,
|
||||
resultSignal: make(chan dataPathResult),
|
||||
pvbInformer: pvbInformer,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) Init() error {
|
||||
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.pvbName, r.nodeName, r.logger)
|
||||
|
||||
handler, err := r.pvbInformer.AddEventHandler(
|
||||
cachetool.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(oldObj any, newObj any) {
|
||||
oldPvb := oldObj.(*velerov1api.PodVolumeBackup)
|
||||
newPvb := newObj.(*velerov1api.PodVolumeBackup)
|
||||
|
||||
if newPvb.Name != r.pvbName {
|
||||
return
|
||||
}
|
||||
|
||||
if newPvb.Status.Phase != velerov1api.PodVolumeBackupPhaseInProgress {
|
||||
return
|
||||
}
|
||||
|
||||
if newPvb.Spec.Cancel && !oldPvb.Spec.Cancel {
|
||||
r.cancelPodVolumeBackup(newPvb)
|
||||
}
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error adding PVB handler")
|
||||
}
|
||||
|
||||
r.pvbHandler = handler
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string, error) {
|
||||
log := r.logger.WithFields(logrus.Fields{
|
||||
"PVB": r.pvbName,
|
||||
})
|
||||
|
||||
pvb := &velerov1api.PodVolumeBackup{}
|
||||
err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) {
|
||||
err := r.client.Get(ctx, types.NamespacedName{
|
||||
Namespace: r.namespace,
|
||||
Name: r.pvbName,
|
||||
}, pvb)
|
||||
|
||||
if apierrors.IsNotFound(err) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return true, errors.Wrapf(err, "error to get PVB %s", r.pvbName)
|
||||
}
|
||||
|
||||
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseInProgress {
|
||||
return true, nil
|
||||
} else {
|
||||
return false, nil
|
||||
}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to wait PVB")
|
||||
return "", errors.Wrap(err, "error waiting for PVB")
|
||||
}
|
||||
|
||||
r.pvb = pvb
|
||||
|
||||
log.Info("Run cancelable PVB")
|
||||
|
||||
callbacks := datapath.Callbacks{
|
||||
OnCompleted: r.OnDataPathCompleted,
|
||||
OnFailed: r.OnDataPathFailed,
|
||||
OnCancelled: r.OnDataPathCancelled,
|
||||
OnProgress: r.OnDataPathProgress,
|
||||
}
|
||||
|
||||
fsBackup, err := r.dataPathMgr.CreateFileSystemBR(pvb.Name, podVolumeRequestor, ctx, r.client, pvb.Namespace, callbacks, log)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "error to create data path")
|
||||
}
|
||||
|
||||
log.Debug("Async fs br created")
|
||||
|
||||
if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
|
||||
BSLName: pvb.Spec.BackupStorageLocation,
|
||||
SourceNamespace: pvb.Spec.Pod.Namespace,
|
||||
UploaderType: pvb.Spec.UploaderType,
|
||||
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
|
||||
RepoIdentifier: "",
|
||||
RepositoryEnsurer: r.repoEnsurer,
|
||||
CredentialGetter: r.credentialGetter,
|
||||
}); err != nil {
|
||||
return "", errors.Wrap(err, "error to initialize data path")
|
||||
}
|
||||
|
||||
log.Info("Async fs br init")
|
||||
|
||||
tags := map[string]string{}
|
||||
|
||||
if err := fsBackup.StartBackup(r.sourceTargetPath, pvb.Spec.UploaderSettings, &datapath.FSBRStartParam{
|
||||
RealSource: GetRealSource(pvb),
|
||||
ParentSnapshot: "",
|
||||
ForceFull: false,
|
||||
Tags: tags,
|
||||
}); err != nil {
|
||||
return "", errors.Wrap(err, "error starting data path backup")
|
||||
}
|
||||
|
||||
log.Info("Async fs backup data path started")
|
||||
r.eventRecorder.Event(pvb, false, datapath.EventReasonStarted, "Data path for %s started", pvb.Name)
|
||||
|
||||
result := ""
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = errors.New("timed out waiting for fs backup to complete")
|
||||
break
|
||||
case res := <-r.resultSignal:
|
||||
err = res.err
|
||||
result = res.result
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Async fs backup was not completed")
|
||||
}
|
||||
|
||||
r.eventRecorder.EndingEvent(pvb, false, datapath.EventReasonStopped, "Data path for %s stopped", pvb.Name)
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) Shutdown() {
|
||||
r.eventRecorder.Shutdown()
|
||||
r.closeDataPath(r.ctx, r.pvbName)
|
||||
|
||||
if r.pvbHandler != nil {
|
||||
if err := r.pvbInformer.RemoveEventHandler(r.pvbHandler); err != nil {
|
||||
r.logger.WithError(err).Warn("Failed to remove pod handler")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var funcMarshal = json.Marshal
|
||||
|
||||
func (r *BackupMicroService) OnDataPathCompleted(ctx context.Context, namespace string, pvbName string, result datapath.Result) {
|
||||
log := r.logger.WithField("PVB", pvbName)
|
||||
|
||||
backupBytes, err := funcMarshal(result.Backup)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("Failed to marshal backup result %v", result.Backup)
|
||||
r.resultSignal <- dataPathResult{
|
||||
err: errors.Wrapf(err, "Failed to marshal backup result %v", result.Backup),
|
||||
}
|
||||
} else {
|
||||
r.eventRecorder.Event(r.pvb, false, datapath.EventReasonCompleted, string(backupBytes))
|
||||
r.resultSignal <- dataPathResult{
|
||||
result: string(backupBytes),
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("Async fs backup completed")
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) OnDataPathFailed(ctx context.Context, namespace string, pvbName string, err error) {
|
||||
log := r.logger.WithField("PVB", pvbName)
|
||||
log.WithError(err).Error("Async fs backup data path failed")
|
||||
|
||||
r.eventRecorder.Event(r.pvb, false, datapath.EventReasonFailed, "Data path for PVB %s failed, error %v", r.pvbName, err)
|
||||
r.resultSignal <- dataPathResult{
|
||||
err: errors.Wrapf(err, "Data path for PVB %s failed", r.pvbName),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) OnDataPathCancelled(ctx context.Context, namespace string, pvbName string) {
|
||||
log := r.logger.WithField("PVB", pvbName)
|
||||
log.Warn("Async fs backup data path canceled")
|
||||
|
||||
r.eventRecorder.Event(r.pvb, false, datapath.EventReasonCancelled, "Data path for PVB %s canceled", pvbName)
|
||||
r.resultSignal <- dataPathResult{
|
||||
err: errors.New(datapath.ErrCancelled),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) OnDataPathProgress(ctx context.Context, namespace string, pvbName string, progress *uploader.Progress) {
|
||||
log := r.logger.WithFields(logrus.Fields{
|
||||
"PVB": pvbName,
|
||||
})
|
||||
|
||||
progressBytes, err := funcMarshal(progress)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("Failed to marshal progress %v", progress)
|
||||
return
|
||||
}
|
||||
|
||||
r.eventRecorder.Event(r.pvb, false, datapath.EventReasonProgress, string(progressBytes))
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) closeDataPath(ctx context.Context, duName string) {
|
||||
fsBackup := r.dataPathMgr.GetAsyncBR(duName)
|
||||
if fsBackup != nil {
|
||||
fsBackup.Close(ctx)
|
||||
}
|
||||
|
||||
r.dataPathMgr.RemoveAsyncBR(duName)
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) cancelPodVolumeBackup(pvb *velerov1api.PodVolumeBackup) {
|
||||
r.logger.WithField("PVB", pvb.Name).Info("PVB is being canceled")
|
||||
|
||||
r.eventRecorder.Event(pvb, false, datapath.EventReasonCancelling, "Canceling for PVB %s", pvb.Name)
|
||||
|
||||
fsBackup := r.dataPathMgr.GetAsyncBR(pvb.Name)
|
||||
if fsBackup == nil {
|
||||
r.OnDataPathCancelled(r.ctx, pvb.GetNamespace(), pvb.GetName())
|
||||
} else {
|
||||
fsBackup.Cancel()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,447 @@
|
|||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podvolume
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
|
||||
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
|
||||
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
|
||||
)
|
||||
|
||||
type backupMsTestHelper struct {
|
||||
eventReason string
|
||||
eventMsg string
|
||||
marshalErr error
|
||||
marshalBytes []byte
|
||||
withEvent bool
|
||||
eventLock sync.Mutex
|
||||
}
|
||||
|
||||
func (bt *backupMsTestHelper) Event(_ runtime.Object, _ bool, reason string, message string, a ...any) {
|
||||
bt.eventLock.Lock()
|
||||
defer bt.eventLock.Unlock()
|
||||
|
||||
bt.withEvent = true
|
||||
bt.eventReason = reason
|
||||
bt.eventMsg = fmt.Sprintf(message, a...)
|
||||
}
|
||||
|
||||
func (bt *backupMsTestHelper) EndingEvent(_ runtime.Object, _ bool, reason string, message string, a ...any) {
|
||||
bt.eventLock.Lock()
|
||||
defer bt.eventLock.Unlock()
|
||||
|
||||
bt.withEvent = true
|
||||
bt.eventReason = reason
|
||||
bt.eventMsg = fmt.Sprintf(message, a...)
|
||||
}
|
||||
func (bt *backupMsTestHelper) Shutdown() {}
|
||||
|
||||
func (bt *backupMsTestHelper) Marshal(v any) ([]byte, error) {
|
||||
if bt.marshalErr != nil {
|
||||
return nil, bt.marshalErr
|
||||
}
|
||||
|
||||
return bt.marshalBytes, nil
|
||||
}
|
||||
|
||||
func (bt *backupMsTestHelper) EventReason() string {
|
||||
bt.eventLock.Lock()
|
||||
defer bt.eventLock.Unlock()
|
||||
|
||||
return bt.eventReason
|
||||
}
|
||||
|
||||
func (bt *backupMsTestHelper) EventMessage() string {
|
||||
bt.eventLock.Lock()
|
||||
defer bt.eventLock.Unlock()
|
||||
|
||||
return bt.eventMsg
|
||||
}
|
||||
|
||||
func TestOnDataPathFailed(t *testing.T) {
|
||||
pvbName := "fake-pvb"
|
||||
bt := &backupMsTestHelper{}
|
||||
|
||||
bs := &BackupMicroService{
|
||||
pvbName: pvbName,
|
||||
dataPathMgr: datapath.NewManager(1),
|
||||
eventRecorder: bt,
|
||||
resultSignal: make(chan dataPathResult),
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
expectedErr := "Data path for PVB fake-pvb failed: fake-error"
|
||||
expectedEventReason := datapath.EventReasonFailed
|
||||
expectedEventMsg := "Data path for PVB fake-pvb failed, error fake-error"
|
||||
|
||||
go bs.OnDataPathFailed(context.TODO(), velerov1api.DefaultNamespace, pvbName, errors.New("fake-error"))
|
||||
|
||||
result := <-bs.resultSignal
|
||||
assert.EqualError(t, result.err, expectedErr)
|
||||
assert.Equal(t, expectedEventReason, bt.EventReason())
|
||||
assert.Equal(t, expectedEventMsg, bt.EventMessage())
|
||||
}
|
||||
|
||||
func TestOnDataPathCancelled(t *testing.T) {
|
||||
pvbName := "fake-pvb"
|
||||
bt := &backupMsTestHelper{}
|
||||
|
||||
bs := &BackupMicroService{
|
||||
pvbName: pvbName,
|
||||
dataPathMgr: datapath.NewManager(1),
|
||||
eventRecorder: bt,
|
||||
resultSignal: make(chan dataPathResult),
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
expectedErr := datapath.ErrCancelled
|
||||
expectedEventReason := datapath.EventReasonCancelled
|
||||
expectedEventMsg := "Data path for PVB fake-pvb canceled"
|
||||
|
||||
go bs.OnDataPathCancelled(context.TODO(), velerov1api.DefaultNamespace, pvbName)
|
||||
|
||||
result := <-bs.resultSignal
|
||||
assert.EqualError(t, result.err, expectedErr)
|
||||
assert.Equal(t, expectedEventReason, bt.EventReason())
|
||||
assert.Equal(t, expectedEventMsg, bt.EventMessage())
|
||||
}
|
||||
|
||||
func TestOnDataPathCompleted(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
expectedErr string
|
||||
expectedEventReason string
|
||||
expectedEventMsg string
|
||||
marshalErr error
|
||||
marshallStr string
|
||||
}{
|
||||
{
|
||||
name: "marshal fail",
|
||||
marshalErr: errors.New("fake-marshal-error"),
|
||||
expectedErr: "Failed to marshal backup result { false { } 0}: fake-marshal-error",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
marshallStr: "fake-complete-string",
|
||||
expectedEventReason: datapath.EventReasonCompleted,
|
||||
expectedEventMsg: "fake-complete-string",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
pvbName := "fake-pvb"
|
||||
|
||||
bt := &backupMsTestHelper{
|
||||
marshalErr: test.marshalErr,
|
||||
marshalBytes: []byte(test.marshallStr),
|
||||
}
|
||||
|
||||
bs := &BackupMicroService{
|
||||
dataPathMgr: datapath.NewManager(1),
|
||||
eventRecorder: bt,
|
||||
resultSignal: make(chan dataPathResult),
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
funcMarshal = bt.Marshal
|
||||
|
||||
go bs.OnDataPathCompleted(context.TODO(), velerov1api.DefaultNamespace, pvbName, datapath.Result{})
|
||||
|
||||
result := <-bs.resultSignal
|
||||
if test.marshalErr != nil {
|
||||
assert.EqualError(t, result.err, test.expectedErr)
|
||||
} else {
|
||||
assert.NoError(t, result.err)
|
||||
assert.Equal(t, test.expectedEventReason, bt.EventReason())
|
||||
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestOnDataPathProgress(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
expectedErr string
|
||||
expectedEventReason string
|
||||
expectedEventMsg string
|
||||
marshalErr error
|
||||
marshallStr string
|
||||
}{
|
||||
{
|
||||
name: "marshal fail",
|
||||
marshalErr: errors.New("fake-marshal-error"),
|
||||
expectedErr: "Failed to marshal backup result",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
marshallStr: "fake-progress-string",
|
||||
expectedEventReason: datapath.EventReasonProgress,
|
||||
expectedEventMsg: "fake-progress-string",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
pvbName := "fake-pvb"
|
||||
|
||||
bt := &backupMsTestHelper{
|
||||
marshalErr: test.marshalErr,
|
||||
marshalBytes: []byte(test.marshallStr),
|
||||
}
|
||||
|
||||
bs := &BackupMicroService{
|
||||
dataPathMgr: datapath.NewManager(1),
|
||||
eventRecorder: bt,
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
funcMarshal = bt.Marshal
|
||||
|
||||
bs.OnDataPathProgress(context.TODO(), velerov1api.DefaultNamespace, pvbName, &uploader.Progress{})
|
||||
|
||||
if test.marshalErr != nil {
|
||||
assert.False(t, bt.withEvent)
|
||||
} else {
|
||||
assert.True(t, bt.withEvent)
|
||||
assert.Equal(t, test.expectedEventReason, bt.EventReason())
|
||||
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCancelPodVolumeBackup(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
expectedEventReason string
|
||||
expectedEventMsg string
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "no fs backup",
|
||||
expectedEventReason: datapath.EventReasonCancelled,
|
||||
expectedEventMsg: "Data path for PVB fake-pvb canceled",
|
||||
expectedErr: datapath.ErrCancelled,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
pvbName := "fake-pvb"
|
||||
pvb := builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, pvbName).Result()
|
||||
|
||||
bt := &backupMsTestHelper{}
|
||||
|
||||
bs := &BackupMicroService{
|
||||
dataPathMgr: datapath.NewManager(1),
|
||||
eventRecorder: bt,
|
||||
resultSignal: make(chan dataPathResult),
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
go bs.cancelPodVolumeBackup(pvb)
|
||||
|
||||
result := <-bs.resultSignal
|
||||
|
||||
assert.EqualError(t, result.err, test.expectedErr)
|
||||
assert.True(t, bt.withEvent)
|
||||
assert.Equal(t, test.expectedEventReason, bt.EventReason())
|
||||
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunCancelableDataPath(t *testing.T) {
|
||||
pvbName := "fake-pvb"
|
||||
pvb := builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, pvbName).Phase(velerov1api.PodVolumeBackupPhaseNew).Result()
|
||||
pvbInProgress := builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, pvbName).Phase(velerov1api.PodVolumeBackupPhaseInProgress).Result()
|
||||
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
ctx context.Context
|
||||
result *dataPathResult
|
||||
dataPathMgr *datapath.Manager
|
||||
kubeClientObj []runtime.Object
|
||||
initErr error
|
||||
startErr error
|
||||
dataPathStarted bool
|
||||
expectedEventMsg string
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "no pvb",
|
||||
ctx: ctxTimeout,
|
||||
expectedErr: "error waiting for PVB: context deadline exceeded",
|
||||
},
|
||||
{
|
||||
name: "pvb not in in-progress",
|
||||
ctx: ctxTimeout,
|
||||
kubeClientObj: []runtime.Object{pvb},
|
||||
expectedErr: "error waiting for PVB: context deadline exceeded",
|
||||
},
|
||||
{
|
||||
name: "create data path fail",
|
||||
ctx: context.Background(),
|
||||
kubeClientObj: []runtime.Object{pvbInProgress},
|
||||
dataPathMgr: datapath.NewManager(0),
|
||||
expectedErr: "error to create data path: Concurrent number exceeds",
|
||||
},
|
||||
{
|
||||
name: "init data path fail",
|
||||
ctx: context.Background(),
|
||||
kubeClientObj: []runtime.Object{pvbInProgress},
|
||||
initErr: errors.New("fake-init-error"),
|
||||
expectedErr: "error to initialize data path: fake-init-error",
|
||||
},
|
||||
{
|
||||
name: "start data path fail",
|
||||
ctx: context.Background(),
|
||||
kubeClientObj: []runtime.Object{pvbInProgress},
|
||||
startErr: errors.New("fake-start-error"),
|
||||
expectedErr: "error starting data path backup: fake-start-error",
|
||||
},
|
||||
{
|
||||
name: "data path timeout",
|
||||
ctx: ctxTimeout,
|
||||
kubeClientObj: []runtime.Object{pvbInProgress},
|
||||
dataPathStarted: true,
|
||||
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvbName),
|
||||
expectedErr: "timed out waiting for fs backup to complete",
|
||||
},
|
||||
{
|
||||
name: "data path returns error",
|
||||
ctx: context.Background(),
|
||||
kubeClientObj: []runtime.Object{pvbInProgress},
|
||||
dataPathStarted: true,
|
||||
result: &dataPathResult{
|
||||
err: errors.New("fake-data-path-error"),
|
||||
},
|
||||
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvbName),
|
||||
expectedErr: "fake-data-path-error",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
ctx: context.Background(),
|
||||
kubeClientObj: []runtime.Object{pvbInProgress},
|
||||
dataPathStarted: true,
|
||||
result: &dataPathResult{
|
||||
result: "fake-succeed-result",
|
||||
},
|
||||
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvbName),
|
||||
},
|
||||
}
|
||||
|
||||
scheme := runtime.NewScheme()
|
||||
velerov1api.AddToScheme(scheme)
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
fakeClientBuilder := clientFake.NewClientBuilder()
|
||||
fakeClientBuilder = fakeClientBuilder.WithScheme(scheme)
|
||||
|
||||
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
|
||||
|
||||
bt := &backupMsTestHelper{}
|
||||
|
||||
bs := &BackupMicroService{
|
||||
namespace: velerov1api.DefaultNamespace,
|
||||
pvbName: pvbName,
|
||||
ctx: context.Background(),
|
||||
client: fakeClient,
|
||||
dataPathMgr: datapath.NewManager(1),
|
||||
eventRecorder: bt,
|
||||
resultSignal: make(chan dataPathResult),
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
if test.ctx != nil {
|
||||
bs.ctx = test.ctx
|
||||
}
|
||||
|
||||
if test.dataPathMgr != nil {
|
||||
bs.dataPathMgr = test.dataPathMgr
|
||||
}
|
||||
|
||||
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||
fsBR := datapathmockes.NewAsyncBR(t)
|
||||
if test.initErr != nil {
|
||||
fsBR.On("Init", mock.Anything, mock.Anything).Return(test.initErr)
|
||||
}
|
||||
|
||||
if test.startErr != nil {
|
||||
fsBR.On("Init", mock.Anything, mock.Anything).Return(nil)
|
||||
fsBR.On("StartBackup", mock.Anything, mock.Anything, mock.Anything).Return(test.startErr)
|
||||
}
|
||||
|
||||
if test.dataPathStarted {
|
||||
fsBR.On("Init", mock.Anything, mock.Anything).Return(nil)
|
||||
fsBR.On("StartBackup", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
}
|
||||
|
||||
return fsBR
|
||||
}
|
||||
|
||||
if test.result != nil {
|
||||
go func() {
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
bs.resultSignal <- *test.result
|
||||
}()
|
||||
}
|
||||
|
||||
result, err := bs.RunCancelableDataPath(test.ctx)
|
||||
|
||||
if test.expectedErr != "" {
|
||||
assert.EqualError(t, err, test.expectedErr)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, test.result.result, result)
|
||||
}
|
||||
|
||||
if test.expectedEventMsg != "" {
|
||||
assert.True(t, bt.withEvent)
|
||||
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
cancel()
|
||||
}
|
|
@ -17,12 +17,14 @@ limitations under the License.
|
|||
package podvolume
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/podvolume/configs"
|
||||
repotypes "github.com/vmware-tanzu/velero/pkg/repository/types"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
)
|
||||
|
@ -143,6 +145,19 @@ func GetSnapshotIdentifier(podVolumeBackups *velerov1api.PodVolumeBackupList) ma
|
|||
return res
|
||||
}
|
||||
|
||||
func GetRealSource(pvb *velerov1api.PodVolumeBackup) string {
|
||||
pvcName := ""
|
||||
if pvb.Annotations != nil {
|
||||
pvcName = pvb.Annotations[configs.PVCNameAnnotation]
|
||||
}
|
||||
|
||||
if pvcName != "" {
|
||||
return fmt.Sprintf("%s/%s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name, pvcName)
|
||||
} else {
|
||||
return fmt.Sprintf("%s/%s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name, pvb.Spec.Volume)
|
||||
}
|
||||
}
|
||||
|
||||
func getUploaderTypeOrDefault(uploaderType string) string {
|
||||
if uploaderType != "" {
|
||||
return uploaderType
|
||||
|
|
|
@ -300,3 +300,34 @@ func TestVolumeHasNonRestorableSource(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetRealSource(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
pvb *velerov1api.PodVolumeBackup
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "pvb with empty annotation",
|
||||
pvb: builder.ForPodVolumeBackup("fake-ns", "fake-name").PodNamespace("fake-pod-ns").PodName("fake-pod-name").Volume("fake-volume").Result(),
|
||||
expected: "fake-pod-ns/fake-pod-name/fake-volume",
|
||||
},
|
||||
{
|
||||
name: "pvb without pvc name annotation",
|
||||
pvb: builder.ForPodVolumeBackup("fake-ns", "fake-name").PodNamespace("fake-pod-ns").PodName("fake-pod-name").Volume("fake-volume").Annotations(map[string]string{}).Result(),
|
||||
expected: "fake-pod-ns/fake-pod-name/fake-volume",
|
||||
},
|
||||
{
|
||||
name: "pvb with pvc name annotation",
|
||||
pvb: builder.ForPodVolumeBackup("fake-ns", "fake-name").PodNamespace("fake-pod-ns").PodName("fake-pod-name").Volume("fake-volume").Annotations(map[string]string{"velero.io/pvc-name": "fake-pvc-name"}).Result(),
|
||||
expected: "fake-pod-ns/fake-pod-name/fake-pvc-name",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
actual := GetRealSource(tc.pvb)
|
||||
assert.Equal(t, tc.expected, actual)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
@ -273,3 +274,30 @@ func DiagnosePod(pod *corev1api.Pod) string {
|
|||
|
||||
return diag
|
||||
}
|
||||
|
||||
var funcExit = os.Exit
|
||||
var funcCreateFile = os.Create
|
||||
|
||||
func ExitPodWithMessage(logger logrus.FieldLogger, succeed bool, message string, a ...any) {
|
||||
exitCode := 0
|
||||
if !succeed {
|
||||
exitCode = 1
|
||||
}
|
||||
|
||||
toWrite := fmt.Sprintf(message, a...)
|
||||
|
||||
podFile, err := funcCreateFile("/dev/termination-log")
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("Failed to create termination log file")
|
||||
exitCode = 1
|
||||
} else {
|
||||
if _, err := podFile.WriteString(toWrite); err != nil {
|
||||
logger.WithError(err).Error("Failed to write error to termination log file")
|
||||
exitCode = 1
|
||||
}
|
||||
|
||||
podFile.Close()
|
||||
}
|
||||
|
||||
funcExit(exitCode)
|
||||
}
|
||||
|
|
|
@ -18,14 +18,19 @@ package kube
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
|
@ -932,3 +937,105 @@ func TestDiagnosePod(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
type exitWithMessageMock struct {
|
||||
createErr error
|
||||
writeFail bool
|
||||
filePath string
|
||||
exitCode int
|
||||
}
|
||||
|
||||
func (em *exitWithMessageMock) Exit(code int) {
|
||||
em.exitCode = code
|
||||
}
|
||||
|
||||
func (em *exitWithMessageMock) CreateFile(name string) (*os.File, error) {
|
||||
if em.createErr != nil {
|
||||
return nil, em.createErr
|
||||
}
|
||||
|
||||
if em.writeFail {
|
||||
return os.OpenFile(em.filePath, os.O_CREATE|os.O_RDONLY, 0500)
|
||||
} else {
|
||||
return os.Create(em.filePath)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExitPodWithMessage(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
message string
|
||||
succeed bool
|
||||
args []any
|
||||
createErr error
|
||||
writeFail bool
|
||||
expectedExitCode int
|
||||
expectedMessage string
|
||||
}{
|
||||
{
|
||||
name: "create pod file failed",
|
||||
createErr: errors.New("fake-create-file-error"),
|
||||
succeed: true,
|
||||
expectedExitCode: 1,
|
||||
},
|
||||
{
|
||||
name: "write pod file failed",
|
||||
writeFail: true,
|
||||
succeed: true,
|
||||
expectedExitCode: 1,
|
||||
},
|
||||
{
|
||||
name: "not succeed",
|
||||
message: "fake-message-1, arg-1 %s, arg-2 %v, arg-3 %v",
|
||||
args: []any{
|
||||
"arg-1-1",
|
||||
10,
|
||||
false,
|
||||
},
|
||||
expectedExitCode: 1,
|
||||
expectedMessage: fmt.Sprintf("fake-message-1, arg-1 %s, arg-2 %v, arg-3 %v", "arg-1-1", 10, false),
|
||||
},
|
||||
{
|
||||
name: "not succeed",
|
||||
message: "fake-message-2, arg-1 %s, arg-2 %v, arg-3 %v",
|
||||
args: []any{
|
||||
"arg-1-2",
|
||||
20,
|
||||
true,
|
||||
},
|
||||
succeed: true,
|
||||
expectedMessage: fmt.Sprintf("fake-message-2, arg-1 %s, arg-2 %v, arg-3 %v", "arg-1-2", 20, true),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
podFile := filepath.Join(os.TempDir(), uuid.NewString())
|
||||
|
||||
em := exitWithMessageMock{
|
||||
createErr: test.createErr,
|
||||
writeFail: test.writeFail,
|
||||
filePath: podFile,
|
||||
}
|
||||
|
||||
funcExit = em.Exit
|
||||
funcCreateFile = em.CreateFile
|
||||
|
||||
ExitPodWithMessage(velerotest.NewLogger(), test.succeed, test.message, test.args...)
|
||||
|
||||
assert.Equal(t, test.expectedExitCode, em.exitCode)
|
||||
|
||||
if test.createErr == nil && !test.writeFail {
|
||||
reader, err := os.Open(podFile)
|
||||
require.NoError(t, err)
|
||||
|
||||
message, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
reader.Close()
|
||||
|
||||
assert.Equal(t, test.expectedMessage, string(message))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@ package velero
|
|||
import (
|
||||
appsv1api "k8s.io/api/apps/v1"
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
)
|
||||
|
||||
// GetNodeSelectorFromVeleroServer get the node selector from the Velero server deployment
|
||||
|
@ -105,3 +107,7 @@ func GetVeleroServerAnnotationValue(deployment *appsv1api.Deployment, key string
|
|||
|
||||
return deployment.Spec.Template.Annotations[key]
|
||||
}
|
||||
|
||||
func BSLIsAvailable(bsl velerov1api.BackupStorageLocation) bool {
|
||||
return bsl.Status.Phase == velerov1api.BackupStorageLocationPhaseAvailable
|
||||
}
|
||||
|
|
|
@ -24,6 +24,9 @@ import (
|
|||
appsv1api "k8s.io/api/apps/v1"
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
)
|
||||
|
||||
func TestGetNodeSelectorFromVeleroServer(t *testing.T) {
|
||||
|
@ -759,3 +762,11 @@ func TestGetVeleroServerLabelValue(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBSLIsAvailable(t *testing.T) {
|
||||
availableBSL := builder.ForBackupStorageLocation("velero", "available").Phase(velerov1api.BackupStorageLocationPhaseAvailable).Result()
|
||||
unavailableBSL := builder.ForBackupStorageLocation("velero", "unavailable").Phase(velerov1api.BackupStorageLocationPhaseUnavailable).Result()
|
||||
|
||||
assert.True(t, BSLIsAvailable(*availableBSL))
|
||||
assert.False(t, BSLIsAvailable(*unavailableBSL))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue