Merge branch 'main' into issue-fix-6341

pull/6368/head
Lyndon-Li 2023-06-15 10:11:47 +08:00
commit b00633976b
59 changed files with 3313 additions and 113 deletions

View File

@ -5,7 +5,9 @@
## Overview
Velero (formerly Heptio Ark) gives you tools to back up and restore your Kubernetes cluster resources and persistent volumes. You can run Velero with a public cloud platform or on-premises. Velero lets you:
Velero (formerly Heptio Ark) gives you tools to back up and restore your Kubernetes cluster resources and persistent volumes. You can run Velero with a public cloud platform or on-premises.
Velero lets you:
* Take backups of your cluster and restore in case of loss.
* Migrate cluster resources to other clusters.
@ -18,7 +20,7 @@ Velero consists of:
## Documentation
[The documentation][29] provides a getting started guide and information about building from source, architecture, extending Velero, and more.
[The documentation][29] provides a getting started guide and information about building from source, architecture, extending Velero and more.
Please use the version selector at the top of the site to ensure you are using the appropriate documentation for your version of Velero.

View File

@ -0,0 +1 @@
Add default values for defaultItemOperationTimeout and itemOperationSyncFrequency in velero CLI

View File

@ -0,0 +1 @@
Update metrics when backup failed with validation error

View File

@ -0,0 +1 @@
Add the code for data mover restore expose

View File

@ -0,0 +1 @@
Add data mover related options in CLI

View File

@ -0,0 +1 @@
Do not persist VolumeSnapshot and VolumeSnapshotContent for snapshot DataMover case.

View File

@ -0,0 +1 @@
Change kopia as the default path of PVB

View File

@ -0,0 +1 @@
Add unit test for pkg/uploader

View File

@ -0,0 +1 @@
Add warning message for volume snapshotter in data mover case.

View File

@ -337,10 +337,9 @@ Therefore, for the new path, Velero uses the information in the BackupStorageLoc
The legacy path will be kept as is. That is, Velero still sets/gets the repoIdentififer in BackupRepository CRs, PodVolume Backup CRs and PodVolume Restore CRs and then passes to Restic CLI.
## Installation
We will add a new flag "--pod-volume-backup-uploader" during installation. The flag has 3 meanings:
- It indicates PodVolume BR as the default method to protect PV data over other methods, i.e., durable snapshot. Therefore, the existing --use-restic option will be replaced
We will add a new flag "--uploader-type" during installation. The flag has 2 meanings:
- It indicates the file system uploader to be used by PodVolume BR
- It implies the backup repository type manner, Restic if pod-volume-backup-uploader=restic, Unified Repository in all other cases
- It implies the backup repository type manner, Restic if uploader-type=restic, Unified Repository in all other cases
The flag has below two values:
**"Restic"**: it means Velero will use Restic to do the pod volume backup. Therefore, the Velero server deployment will be created as below:
@ -470,7 +469,7 @@ Below sample files demonstrate complete CRs with all the changes mentioned above
## User Perspective
This design aims to provide a flexible backup repository layer and a generic file system uploader, which are fundermental for PodVolume and other data movements. Although this will make Velero more capable, at present, we don't pursue to expose differentiated features end to end. Specifically:
- By default, Velero still uses Restic for PodVolume BR
- For a fresh installation, if the "--uploader-type" is not specified, there is a default value for PodVolume BR. We will keep it as "restic" for at least one release, then we switch the value to "kopia"
- Even when changing to the new path, Velero still allows users to restore from the data backed up by Restic
- The capability of PodVolume BR under the new path is kept the same as it under Restic path and the same as the existing PodVolume BR
- The operational experiences are kept the same as much as possible, the known changes are listed below

View File

@ -26,6 +26,8 @@ kind: Deployment
metadata:
name: nginx-deployment
namespace: nginx-example
labels:
app: nginx
spec:
replicas: 2
selector:

View File

@ -42,5 +42,5 @@ fi
# but the user and group don't exist inside the container, when the code(https://github.com/kubernetes-sigs/controller-runtime/blob/v0.10.2/pkg/internal/testing/addr/manager.go#L44)
# tries to get the cache directory, it gets the directory "/" and then get the permission error when trying to create directory under "/".
# Specifying the cache directory by environment variable "XDG_CACHE_HOME" to workaround it
XDG_CACHE_HOME=/tmp/ go test -installsuffix "static" -short -timeout 60s -coverprofile=coverage.out "${TARGETS[@]}"
XDG_CACHE_HOME=/tmp/ go test -installsuffix "static" -short -timeout 120s -coverprofile=coverage.out "${TARGETS[@]}"
echo "Success!"

View File

@ -480,6 +480,13 @@ func (ib *itemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.Fie
return nil
}
// TODO: Snapshot data mover is only supported for CSI plugin scenario by now.
// Need to add a mechanism to choose running which plugin for resources.
// After that, this warning can be removed.
if boolptr.IsSetToTrue(ib.backupRequest.Spec.SnapshotMoveData) {
log.Warnf("VolumeSnapshotter plugin doesn't support data movement.")
}
if ib.backupRequest.ResPolicies != nil {
if action, err := ib.backupRequest.ResPolicies.GetMatchAction(pv); err != nil {
log.WithError(err).Errorf("Error getting matched resource policies for pv %s", pv.Name)

View File

@ -287,3 +287,15 @@ func (b *BackupBuilder) ResourcePolicies(name string) *BackupBuilder {
b.object.Spec.ResourcePolicy = &v1.TypedLocalObjectReference{Kind: resourcepolicies.ConfigmapRefType, Name: name}
return b
}
// SnapshotMoveData sets the Backup's "snapshot move data" flag.
func (b *BackupBuilder) SnapshotMoveData(val bool) *BackupBuilder {
b.object.Spec.SnapshotMoveData = &val
return b
}
// DataMover sets the Backup's data mover
func (b *BackupBuilder) DataMover(name string) *BackupBuilder {
b.object.Spec.DataMover = name
return b
}

View File

@ -67,3 +67,9 @@ func (b *PersistentVolumeClaimBuilder) StorageClass(name string) *PersistentVolu
b.object.Spec.StorageClassName = &name
return b
}
// Phase sets the PersistentVolumeClaim's status Phase.
func (b *PersistentVolumeClaimBuilder) Phase(phase corev1api.PersistentVolumeClaimPhase) *PersistentVolumeClaimBuilder {
b.object.Status.Phase = phase
return b
}

View File

@ -81,3 +81,9 @@ func ForStorageClassSlice(names ...string) *StorageClassBuilder {
func (b *StorageClassBuilder) SliceResult() []*storagev1api.StorageClass {
return b.objectSlice
}
// Provisioner sets StorageClass's provisioner.
func (b *StorageClassBuilder) Provisioner(provisioner string) *StorageClassBuilder {
b.object.Provisioner = provisioner
return b
}

View File

@ -0,0 +1,62 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package builder
import (
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// VolumeSnapshotClassBuilder builds VolumeSnapshotClass objects.
type VolumeSnapshotClassBuilder struct {
object *snapshotv1api.VolumeSnapshotClass
}
// ForVolumeSnapshotClass is the constructor of VolumeSnapshotClassBuilder.
func ForVolumeSnapshotClass(name string) *VolumeSnapshotClassBuilder {
return &VolumeSnapshotClassBuilder{
object: &snapshotv1api.VolumeSnapshotClass{
TypeMeta: metav1.TypeMeta{
Kind: "VolumeSnapshotClass",
APIVersion: snapshotv1api.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
},
}
}
// Result returns the built VolumeSnapshotClass.
func (b *VolumeSnapshotClassBuilder) Result() *snapshotv1api.VolumeSnapshotClass {
return b.object
}
// Driver sets the driver of built VolumeSnapshotClass.
func (b *VolumeSnapshotClassBuilder) Driver(driver string) *VolumeSnapshotClassBuilder {
b.object.Driver = driver
return b
}
// ObjectMeta applies functional options to the VolumeSnapshotClass's ObjectMeta.
func (b *VolumeSnapshotClassBuilder) ObjectMeta(opts ...ObjectMetaOpt) *VolumeSnapshotClassBuilder {
for _, opt := range opts {
opt(b.object)
}
return b
}

View File

@ -17,6 +17,9 @@ limitations under the License.
package client
import (
"os"
"reflect"
"strings"
"testing"
"github.com/stretchr/testify/assert"
@ -32,3 +35,81 @@ func TestVeleroConfig(t *testing.T) {
assert.Equal(t, []string{"feature1", "feature2"}, c.Features())
assert.Equal(t, true, c.Colorized())
}
func removeConfigfileName() error {
// Remove config file if it exist
configFile := configFileName()
e := os.Remove(configFile)
if e != nil {
if !os.IsNotExist(e) {
return e
}
}
return nil
}
func TestConfigOperations(t *testing.T) {
preHomeEnv := ""
prevEnv := os.Environ()
for _, entry := range prevEnv {
parts := strings.SplitN(entry, "=", 2)
if len(parts) == 2 && parts[0] == "HOME" {
preHomeEnv = parts[1]
break
}
}
os.Unsetenv("HOME")
os.Setenv("HOME", ".")
// Remove config file if it exists
err := removeConfigfileName()
assert.Equal(t, err, nil)
// Test LoadConfig: expect an empty velero config
expectedConfig := VeleroConfig{}
config, err := LoadConfig()
assert.Equal(t, err, nil)
assert.True(t, reflect.DeepEqual(expectedConfig, config))
// Test savedConfig
expectedFeature := "EnableCSI"
expectedColorized := true
expectedNamespace := "ns-velero"
expectedCACert := "ca-cert"
config[ConfigKeyFeatures] = expectedFeature
config[ConfigKeyColorized] = expectedColorized
config[ConfigKeyNamespace] = expectedNamespace
config[ConfigKeyCACert] = expectedCACert
err = SaveConfig(config)
assert.Equal(t, err, nil)
savedConfig, err := LoadConfig()
assert.Equal(t, err, nil)
// Test Features
feature := savedConfig.Features()
assert.Equal(t, 1, len(feature))
assert.Equal(t, expectedFeature, feature[0])
// Test Colorized
colorized := savedConfig.Colorized()
assert.Equal(t, expectedColorized, colorized)
// Test Namespace
namespace := savedConfig.Namespace()
assert.Equal(t, expectedNamespace, namespace)
// Test Features
caCertFile := savedConfig.CACertFile()
assert.Equal(t, expectedCACert, caCertFile)
t.Cleanup(func() {
err = removeConfigfileName()
assert.Equal(t, err, nil)
os.Unsetenv("HOME")
os.Setenv("HOME", preHomeEnv)
})
}

View File

@ -16,11 +16,16 @@ limitations under the License.
package client
import (
"context"
"fmt"
"os"
"strings"
"testing"
"github.com/spf13/pflag"
flag "github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// TestFactory tests the client.Factory interface.
@ -40,7 +45,7 @@ func TestFactory(t *testing.T) {
// Argument should change the namespace
f = NewFactory("velero", make(map[string]interface{}))
s := "flag-velero"
flags := new(pflag.FlagSet)
flags := new(flag.FlagSet)
f.BindFlags(flags)
@ -51,11 +56,91 @@ func TestFactory(t *testing.T) {
// An argument overrides the env variable if both are set.
os.Setenv("VELERO_NAMESPACE", "env-velero")
f = NewFactory("velero", make(map[string]interface{}))
flags = new(pflag.FlagSet)
flags = new(flag.FlagSet)
f.BindFlags(flags)
flags.Parse([]string{"--namespace", s})
assert.Equal(t, s, f.Namespace())
os.Unsetenv("VELERO_NAMESPACE")
tests := []struct {
name string
kubeconfig string
kubecontext string
QPS float32
burst int
baseName string
expectedHost string
}{
{
name: "Test flag setting in factory ClientConfig (test data #1)",
kubeconfig: "kubeconfig",
kubecontext: "federal-context",
QPS: 1.0,
burst: 1,
baseName: "bn-velero-1",
expectedHost: "https://horse.org:4443",
},
{
name: "Test flag setting in factory ClientConfig (test data #2)",
kubeconfig: "kubeconfig",
kubecontext: "queen-anne-context",
QPS: 200.0,
burst: 20,
baseName: "bn-velero-2",
expectedHost: "https://pig.org:443",
},
}
baseName := "velero-bn"
config, err := LoadConfig()
assert.Equal(t, err, nil)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
f = NewFactory(baseName, config)
f.SetClientBurst(test.burst)
f.SetClientQPS(test.QPS)
f.SetBasename(test.baseName)
flags = new(flag.FlagSet)
f.BindFlags(flags)
flags.Parse([]string{"--kubeconfig", test.kubeconfig, "--kubecontext", test.kubecontext})
clientConfig, _ := f.ClientConfig()
assert.Equal(t, test.expectedHost, clientConfig.Host)
assert.Equal(t, test.QPS, clientConfig.QPS)
assert.Equal(t, test.burst, clientConfig.Burst)
strings.Contains(clientConfig.UserAgent, test.baseName)
client, _ := f.Client()
_, e := client.Discovery().ServerGroups()
assert.Contains(t, e.Error(), fmt.Sprintf("Get \"%s/api?timeout=", test.expectedHost))
assert.NotNil(t, client)
kubeClient, _ := f.KubeClient()
group := kubeClient.NodeV1().RESTClient().APIVersion().Group
assert.NotNil(t, kubeClient)
assert.Equal(t, "node.k8s.io", group)
namespace := "ns1"
dynamicClient, _ := f.DynamicClient()
resource := &schema.GroupVersionResource{
Group: "group_test",
Version: "verion_test",
}
list, e := dynamicClient.Resource(*resource).Namespace(namespace).List(
context.Background(),
metav1.ListOptions{
LabelSelector: "none",
},
)
assert.Contains(t, e.Error(), fmt.Sprintf("Get \"%s/apis/%s/%s/namespaces/%s", test.expectedHost, resource.Group, resource.Version, namespace))
assert.Nil(t, list)
assert.NotNil(t, dynamicClient)
kubebuilderClient, e := f.KubebuilderClient()
assert.Contains(t, e.Error(), fmt.Sprintf("Get \"%s/api?timeout=", test.expectedHost))
assert.Nil(t, kubebuilderClient)
})
}
}

View File

@ -85,6 +85,8 @@ type CreateOptions struct {
Name string
TTL time.Duration
SnapshotVolumes flag.OptionalBool
SnapshotMoveData flag.OptionalBool
DataMover string
DefaultVolumesToFsBackup flag.OptionalBool
IncludeNamespaces flag.StringArray
ExcludeNamespaces flag.StringArray
@ -139,6 +141,9 @@ func (o *CreateOptions) BindFlags(flags *pflag.FlagSet) {
// like a normal bool flag
f.NoOptDefVal = "true"
f = flags.VarPF(&o.SnapshotMoveData, "snapshot-move-data", "", "Specify whether snapshot data should be moved")
f.NoOptDefVal = "true"
f = flags.VarPF(&o.IncludeClusterResources, "include-cluster-resources", "", "Include cluster-scoped resources in the backup. Cannot work with include-cluster-scoped-resources, exclude-cluster-scoped-resources, include-namespace-scoped-resources and exclude-namespace-scoped-resources.")
f.NoOptDefVal = "true"
@ -146,6 +151,7 @@ func (o *CreateOptions) BindFlags(flags *pflag.FlagSet) {
f.NoOptDefVal = "true"
flags.StringVar(&o.ResPoliciesConfigmap, "resource-policies-configmap", "", "Reference to the resource policies configmap that backup using")
flags.StringVar(&o.DataMover, "data-mover", "", "Specify the data mover to be used by the backup. If the parameter is not set or set as 'velero', the built-in data mover will be used")
}
// BindWait binds the wait flag separately so it is not called by other create
@ -359,7 +365,8 @@ func (o *CreateOptions) BuildBackup(namespace string) (*velerov1api.Backup, erro
StorageLocation(o.StorageLocation).
VolumeSnapshotLocations(o.SnapshotLocations...).
CSISnapshotTimeout(o.CSISnapshotTimeout).
ItemOperationTimeout(o.ItemOperationTimeout)
ItemOperationTimeout(o.ItemOperationTimeout).
DataMover(o.DataMover)
if len(o.OrderedResources) > 0 {
orders, err := ParseOrderedResources(o.OrderedResources)
if err != nil {
@ -371,6 +378,9 @@ func (o *CreateOptions) BuildBackup(namespace string) (*velerov1api.Backup, erro
if o.SnapshotVolumes.Value != nil {
backupBuilder.SnapshotVolumes(*o.SnapshotVolumes.Value)
}
if o.SnapshotMoveData.Value != nil {
backupBuilder.SnapshotMoveData(*o.SnapshotMoveData.Value)
}
if o.IncludeClusterResources.Value != nil {
backupBuilder.IncludeClusterResources(*o.IncludeClusterResources.Value)
}

View File

@ -143,7 +143,7 @@ func NewInstallOptions() *Options {
NoDefaultBackupLocation: false,
CRDsOnly: false,
DefaultVolumesToFsBackup: false,
UploaderType: uploader.ResticType,
UploaderType: uploader.KopiaType,
}
}

View File

@ -226,10 +226,10 @@ func NewCommand(f client.Factory) *cobra.Command {
command.Flags().DurationVar(&config.defaultBackupTTL, "default-backup-ttl", config.defaultBackupTTL, "How long to wait by default before backups can be garbage collected.")
command.Flags().DurationVar(&config.repoMaintenanceFrequency, "default-repo-maintain-frequency", config.repoMaintenanceFrequency, "How often 'maintain' is run for backup repositories by default.")
command.Flags().DurationVar(&config.garbageCollectionFrequency, "garbage-collection-frequency", config.garbageCollectionFrequency, "How often garbage collection is run for expired backups.")
command.Flags().DurationVar(&config.itemOperationSyncFrequency, "item-operation-sync-frequency", config.itemOperationSyncFrequency, "How often to check status on backup/restore operations after backup/restore processing.")
command.Flags().DurationVar(&config.itemOperationSyncFrequency, "item-operation-sync-frequency", config.itemOperationSyncFrequency, "How often to check status on backup/restore operations after backup/restore processing. Default is 10 seconds")
command.Flags().BoolVar(&config.defaultVolumesToFsBackup, "default-volumes-to-fs-backup", config.defaultVolumesToFsBackup, "Backup all volumes with pod volume file system backup by default.")
command.Flags().StringVar(&config.uploaderType, "uploader-type", config.uploaderType, "Type of uploader to handle the transfer of data of pod volumes")
command.Flags().DurationVar(&config.defaultItemOperationTimeout, "default-item-operation-timeout", config.defaultItemOperationTimeout, "How long to wait on asynchronous BackupItemActions and RestoreItemActions to complete before timing out.")
command.Flags().DurationVar(&config.defaultItemOperationTimeout, "default-item-operation-timeout", config.defaultItemOperationTimeout, "How long to wait on asynchronous BackupItemActions and RestoreItemActions to complete before timing out. Default is 1 hour")
command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters. Default is 10 minutes.")
command.Flags().IntVar(&config.maxConcurrentK8SConnections, "max-concurrent-k8s-connections", config.maxConcurrentK8SConnections, "Max concurrent connections number that Velero can create with kube-apiserver. Default is 30.")

View File

@ -206,6 +206,13 @@ func DescribeBackupSpec(d *Describer, spec velerov1api.BackupSpec) {
d.Println()
d.Printf("Velero-Native Snapshot PVs:\t%s\n", BoolPointerString(spec.SnapshotVolumes, "false", "true", "auto"))
d.Printf("Snapshot Move Data:\t%s\n", BoolPointerString(spec.SnapshotMoveData, "false", "true", "auto"))
if len(spec.DataMover) == 0 {
s = emptyDisplay
} else {
s = spec.DataMover
}
d.Printf("Data Mover:\t%s\n", s)
d.Println()
d.Printf("TTL:\t%s\n", spec.TTL.Duration)

View File

@ -137,6 +137,15 @@ func DescribeBackupSpecInSF(d *StructuredDescriber, spec velerov1api.BackupSpec)
// describe snapshot volumes
backupSpecInfo["veleroNativeSnapshotPVs"] = BoolPointerString(spec.SnapshotVolumes, "false", "true", "auto")
// describe snapshot move data
backupSpecInfo["veleroSnapshotMoveData"] = BoolPointerString(spec.SnapshotMoveData, "false", "true", "auto")
// describe data mover
if len(spec.DataMover) == 0 {
s = emptyDisplay
} else {
s = spec.DataMover
}
backupSpecInfo["dataMover"] = s
// describe TTL
backupSpecInfo["TTL"] = spec.TTL.Duration.String()

View File

@ -256,8 +256,13 @@ func (b *backupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
// store ref to just-updated item for creating patch
original = request.Backup.DeepCopy()
backupScheduleName := request.GetLabels()[velerov1api.ScheduleNameLabel]
if request.Status.Phase == velerov1api.BackupPhaseFailedValidation {
log.Debug("failed to validate backup status")
b.metrics.RegisterBackupValidationFailure(backupScheduleName)
b.metrics.RegisterBackupLastStatus(backupScheduleName, metrics.BackupLastStatusFailure)
return ctrl.Result{}, nil
}
@ -271,7 +276,6 @@ func (b *backupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
log.Debug("Running backup")
backupScheduleName := request.GetLabels()[velerov1api.ScheduleNameLabel]
b.metrics.RegisterBackupAttempt(backupScheduleName)
// execution & upload of backup
@ -648,7 +652,9 @@ func (b *backupReconciler) runBackup(backup *pkgbackup.Request) error {
var volumeSnapshots []snapshotv1api.VolumeSnapshot
var volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent
var volumeSnapshotClasses []snapshotv1api.VolumeSnapshotClass
if features.IsEnabled(velerov1api.CSIFeatureFlag) {
if boolptr.IsSetToTrue(backup.Spec.SnapshotMoveData) {
backupLog.Info("backup SnapshotMoveData is set to true, skip VolumeSnapshot resource persistence.")
} else if features.IsEnabled(velerov1api.CSIFeatureFlag) {
selector := label.NewSelectorForBackup(backup.Name)
vscList := &snapshotv1api.VolumeSnapshotContentList{}

View File

@ -26,6 +26,9 @@ import (
"testing"
"time"
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotfake "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake"
snapshotinformers "github.com/kubernetes-csi/external-snapshotter/client/v4/informers/externalversions"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
@ -44,6 +47,7 @@ import (
pkgbackup "github.com/vmware-tanzu/velero/pkg/backup"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/discovery"
"github.com/vmware-tanzu/velero/pkg/features"
"github.com/vmware-tanzu/velero/pkg/itemoperation"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/persistence"
@ -210,6 +214,7 @@ func TestProcessBackupValidationFailures(t *testing.T) {
defaultBackupLocation: defaultBackupLocation.Name,
clock: &clock.RealClock{},
formatFlag: formatFlag,
metrics: metrics.NewServerMetrics(),
}
require.NotNil(t, test.backup)
@ -581,6 +586,7 @@ func TestProcessBackupCompletions(t *testing.T) {
expectedResult *velerov1api.Backup
backupExists bool
existenceCheckError error
volumeSnapshot *snapshotv1api.VolumeSnapshot
}{
// Finalizing
{
@ -1000,16 +1006,139 @@ func TestProcessBackupCompletions(t *testing.T) {
},
},
},
{
name: "backup with snapshot data movement when CSI feature is enabled",
backup: defaultBackup().SnapshotMoveData(true).Result(),
backupLocation: defaultBackupLocation,
defaultVolumesToFsBackup: false,
expectedResult: &velerov1api.Backup{
TypeMeta: metav1.TypeMeta{
Kind: "Backup",
APIVersion: "velero.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1api.DefaultNamespace,
Name: "backup-1",
Annotations: map[string]string{
"velero.io/source-cluster-k8s-major-version": "1",
"velero.io/source-cluster-k8s-minor-version": "16",
"velero.io/source-cluster-k8s-gitversion": "v1.16.4",
"velero.io/resource-timeout": "0s",
},
Labels: map[string]string{
"velero.io/storage-location": "loc-1",
},
},
Spec: velerov1api.BackupSpec{
StorageLocation: defaultBackupLocation.Name,
DefaultVolumesToFsBackup: boolptr.False(),
SnapshotMoveData: boolptr.True(),
},
Status: velerov1api.BackupStatus{
Phase: velerov1api.BackupPhaseFinalizing,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
CSIVolumeSnapshotsAttempted: 0,
CSIVolumeSnapshotsCompleted: 0,
},
},
volumeSnapshot: builder.ForVolumeSnapshot("velero", "testVS").ObjectMeta(builder.WithLabels(velerov1api.BackupNameLabel, "backup-1")).Result(),
},
{
name: "backup with snapshot data movement set to false when CSI feature is enabled",
backup: defaultBackup().SnapshotMoveData(false).Result(),
//backup: defaultBackup().Result(),
backupLocation: defaultBackupLocation,
defaultVolumesToFsBackup: false,
expectedResult: &velerov1api.Backup{
TypeMeta: metav1.TypeMeta{
Kind: "Backup",
APIVersion: "velero.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1api.DefaultNamespace,
Name: "backup-1",
Annotations: map[string]string{
"velero.io/source-cluster-k8s-major-version": "1",
"velero.io/source-cluster-k8s-minor-version": "16",
"velero.io/source-cluster-k8s-gitversion": "v1.16.4",
"velero.io/resource-timeout": "0s",
},
Labels: map[string]string{
"velero.io/storage-location": "loc-1",
},
},
Spec: velerov1api.BackupSpec{
StorageLocation: defaultBackupLocation.Name,
DefaultVolumesToFsBackup: boolptr.False(),
SnapshotMoveData: boolptr.False(),
},
Status: velerov1api.BackupStatus{
Phase: velerov1api.BackupPhaseFinalizing,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
CSIVolumeSnapshotsAttempted: 1,
CSIVolumeSnapshotsCompleted: 0,
},
},
volumeSnapshot: builder.ForVolumeSnapshot("velero", "testVS").ObjectMeta(builder.WithLabels(velerov1api.BackupNameLabel, "backup-1")).Result(),
},
{
name: "backup with snapshot data movement not set when CSI feature is enabled",
backup: defaultBackup().Result(),
backupLocation: defaultBackupLocation,
defaultVolumesToFsBackup: false,
expectedResult: &velerov1api.Backup{
TypeMeta: metav1.TypeMeta{
Kind: "Backup",
APIVersion: "velero.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1api.DefaultNamespace,
Name: "backup-1",
Annotations: map[string]string{
"velero.io/source-cluster-k8s-major-version": "1",
"velero.io/source-cluster-k8s-minor-version": "16",
"velero.io/source-cluster-k8s-gitversion": "v1.16.4",
"velero.io/resource-timeout": "0s",
},
Labels: map[string]string{
"velero.io/storage-location": "loc-1",
},
},
Spec: velerov1api.BackupSpec{
StorageLocation: defaultBackupLocation.Name,
DefaultVolumesToFsBackup: boolptr.False(),
},
Status: velerov1api.BackupStatus{
Phase: velerov1api.BackupPhaseFinalizing,
Version: 1,
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
CSIVolumeSnapshotsAttempted: 1,
CSIVolumeSnapshotsCompleted: 0,
},
},
volumeSnapshot: builder.ForVolumeSnapshot("velero", "testVS").ObjectMeta(builder.WithLabels(velerov1api.BackupNameLabel, "backup-1")).Result(),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
formatFlag := logging.FormatText
var (
logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag)
pluginManager = new(pluginmocks.Manager)
backupStore = new(persistencemocks.BackupStore)
backupper = new(fakeBackupper)
logger = logging.DefaultLogger(logrus.DebugLevel, formatFlag)
pluginManager = new(pluginmocks.Manager)
backupStore = new(persistencemocks.BackupStore)
backupper = new(fakeBackupper)
snapshotClient = snapshotfake.NewSimpleClientset()
sharedInformer = snapshotinformers.NewSharedInformerFactory(snapshotClient, 0)
snapshotLister = sharedInformer.Snapshot().V1().VolumeSnapshots().Lister()
)
var fakeClient kbclient.Client
@ -1020,6 +1149,12 @@ func TestProcessBackupCompletions(t *testing.T) {
fakeClient = velerotest.NewFakeControllerRuntimeClient(t)
}
if test.volumeSnapshot != nil {
snapshotClient.SnapshotV1().VolumeSnapshots(test.volumeSnapshot.Namespace).Create(context.Background(), test.volumeSnapshot, metav1.CreateOptions{})
sharedInformer.Snapshot().V1().VolumeSnapshots().Informer().GetStore().Add(test.volumeSnapshot)
sharedInformer.WaitForCacheSync(make(chan struct{}))
}
apiServer := velerotest.NewAPIServer(t)
apiServer.DiscoveryClient.FakedServerVersion = &version.Info{
@ -1050,6 +1185,8 @@ func TestProcessBackupCompletions(t *testing.T) {
backupStoreGetter: NewFakeSingleObjectBackupStoreGetter(backupStore),
backupper: backupper,
formatFlag: formatFlag,
volumeSnapshotClient: snapshotClient,
volumeSnapshotLister: snapshotLister,
}
pluginManager.On("GetBackupItemActionsV2").Return(nil, nil)
@ -1079,10 +1216,20 @@ func TestProcessBackupCompletions(t *testing.T) {
// add the default backup storage location to the clientset and the informer/lister store
require.NoError(t, fakeClient.Create(context.Background(), defaultBackupLocation))
// Enable CSI feature flag for SnapshotDataMovement test.
if strings.Contains(test.name, "backup with snapshot data movement") {
features.Enable(velerov1api.CSIFeatureFlag)
}
actualResult, err := c.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Namespace: test.backup.Namespace, Name: test.backup.Name}})
assert.Equal(t, actualResult, ctrl.Result{})
assert.Nil(t, err)
// Disable CSI feature to not impact other test cases.
if strings.Contains(test.name, "backup with snapshot data movement") {
features.Disable(velerov1api.CSIFeatureFlag)
}
res := &velerov1api.Backup{}
err = c.kbClient.Get(context.Background(), kbclient.ObjectKey{Namespace: test.backup.Namespace, Name: test.backup.Name}, res)
require.NoError(t, err)

View File

@ -227,7 +227,7 @@ func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, du *
return r.errorOut(ctx, du, err, "error to initialize data path", log)
}
log.WithField("path", path.ByPath).Info("fs init")
if err := fsBackup.StartBackup(path, "", false, nil); err != nil {
if err := fsBackup.StartBackup(path, fmt.Sprintf("%s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC), "", false, nil); err != nil {
return r.errorOut(ctx, du, err, "error starting data path backup", log)
}

View File

@ -232,7 +232,7 @@ func (f *fakeDataUploadFSBR) Init(ctx context.Context, bslName string, sourceNam
return nil
}
func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error {
func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error {
du := f.du
original := f.du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted

View File

@ -178,7 +178,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
}
if err := fsBackup.StartBackup(path, parentSnapshotID, false, pvb.Spec.Tags); err != nil {
if err := fsBackup.StartBackup(path, "", parentSnapshotID, false, pvb.Spec.Tags); err != nil {
return r.errorOut(ctx, &pvb, err, "error starting data path backup", log)
}

View File

@ -103,7 +103,7 @@ func (b *fakeFSBR) Init(ctx context.Context, bslName string, sourceNamespace str
return nil
}
func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error {
func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error {
pvb := b.pvb
original := b.pvb.DeepCopy()

View File

@ -129,13 +129,13 @@ func (fs *fileSystemBR) Close(ctx context.Context) {
fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed")
}
func (fs *fileSystemBR) StartBackup(source AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error {
func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error {
if !fs.initialized {
return errors.New("file system data path is not initialized")
}
go func() {
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, tags, forceFull, parentSnapshot, fs)
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, fs)
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)

View File

@ -95,12 +95,12 @@ func TestAsyncBackup(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err)
fs.uploaderProv = mockProvider
fs.initialized = true
fs.callbacks = test.callbacks
err := fs.StartBackup(AccessPoint{ByPath: test.path}, "", false, nil)
err := fs.StartBackup(AccessPoint{ByPath: test.path}, "", "", false, nil)
require.Equal(t, nil, err)
<-finish

View File

@ -61,7 +61,7 @@ type AsyncBR interface {
Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error
// StartBackup starts an asynchronous data path instance for backup
StartBackup(source AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error
StartBackup(source AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error
// StartRestore starts an asynchronous data path instance for restore
StartRestore(snapshotID string, target AccessPoint) error

View File

@ -0,0 +1,330 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exposer
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
// GenericRestoreExposer is the interfaces for a generic restore exposer
type GenericRestoreExposer interface {
// Expose starts the process to a restore expose, the expose process may take long time
Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, time.Duration) error
// GetExposed polls the status of the expose.
// If the expose is accessible by the current caller, it waits the expose ready and returns the expose result.
// Otherwise, it returns nil as the expose result without an error.
GetExposed(context.Context, corev1.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error)
// RebindVolume unexposes the restored PV and rebind it to the target PVC
RebindVolume(context.Context, corev1.ObjectReference, string, string, time.Duration) error
// CleanUp cleans up any objects generated during the restore expose
CleanUp(context.Context, corev1.ObjectReference)
}
// NewGenericRestoreExposer creates a new instance of generic restore exposer
func NewGenericRestoreExposer(kubeClient kubernetes.Interface, log logrus.FieldLogger) GenericRestoreExposer {
return &genericRestoreExposer{
kubeClient: kubeClient,
log: log,
}
}
type genericRestoreExposer struct {
kubeClient kubernetes.Interface
log logrus.FieldLogger
}
func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, hostingPodLabels map[string]string, timeout time.Duration) error {
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"target PVC": targetPVCName,
"source namespace": sourceNamespace,
})
selectedNode, targetPVC, err := kube.WaitPVCConsumed(ctx, e.kubeClient.CoreV1(), targetPVCName, sourceNamespace, e.kubeClient.StorageV1(), timeout)
if err != nil {
return errors.Wrapf(err, "error to wait target PVC consumed, %s/%s", sourceNamespace, targetPVCName)
}
curLog.WithField("target PVC", targetPVCName).WithField("selected node", selectedNode).Info("Target PVC is consumed")
restorePod, err := e.createRestorePod(ctx, ownerObject, hostingPodLabels, selectedNode)
if err != nil {
return errors.Wrapf(err, "error to create restore pod")
}
curLog.WithField("pod name", restorePod.Name).Info("Restore pod is created")
defer func() {
if err != nil {
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePod.Name, restorePod.Namespace, curLog)
}
}()
restorePVC, err := e.createRestorePVC(ctx, ownerObject, targetPVC, selectedNode)
if err != nil {
return errors.Wrap(err, "error to create restore pvc")
}
curLog.WithField("pvc name", restorePVC.Name).Info("Restore PVC is created")
defer func() {
if err != nil {
kube.DeletePVCIfAny(ctx, e.kubeClient.CoreV1(), restorePVC.Name, restorePVC.Namespace, curLog)
}
}()
return nil
}
func (e *genericRestoreExposer) GetExposed(ctx context.Context, ownerObject corev1.ObjectReference, nodeClient client.Client, nodeName string, timeout time.Duration) (*ExposeResult, error) {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"node": nodeName,
})
pod := &corev1.Pod{}
err := nodeClient.Get(ctx, types.NamespacedName{
Namespace: ownerObject.Namespace,
Name: restorePodName,
}, pod)
if err != nil {
if apierrors.IsNotFound(err) {
curLog.WithField("backup pod", restorePodName).Error("Backup pod is not running in the current node")
return nil, nil
} else {
return nil, errors.Wrapf(err, "error to get backup pod %s", restorePodName)
}
}
curLog.WithField("pod", pod.Name).Infof("Restore pod is in running state in node %s", pod.Spec.NodeName)
_, err = kube.WaitPVCBound(ctx, e.kubeClient.CoreV1(), e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, timeout)
if err != nil {
return nil, errors.Wrapf(err, "error to wait restore PVC bound, %s", restorePVCName)
}
curLog.WithField("restore pvc", restorePVCName).Info("Restore PVC is bound")
return &ExposeResult{ByPod: ExposeByPod{HostingPod: pod, PVC: restorePVCName}}, nil
}
func (e *genericRestoreExposer) CleanUp(ctx context.Context, ownerObject corev1.ObjectReference) {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name
kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log)
kube.DeletePVCIfAny(ctx, e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, e.log)
}
func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, timeout time.Duration) error {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"target PVC": targetPVCName,
"source namespace": sourceNamespace,
})
targetPVC, err := e.kubeClient.CoreV1().PersistentVolumeClaims(sourceNamespace).Get(ctx, targetPVCName, metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "error to get target PVC %s/%s", sourceNamespace, targetPVCName)
}
restorePV, err := kube.WaitPVCBound(ctx, e.kubeClient.CoreV1(), e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, timeout)
if err != nil {
return errors.Wrapf(err, "error to get PV from restore PVC %s", restorePVCName)
}
orgReclaim := restorePV.Spec.PersistentVolumeReclaimPolicy
curLog.WithField("restore PV", restorePV.Name).Info("Restore PV is retrieved")
retained, err := kube.SetPVReclaimPolicy(ctx, e.kubeClient.CoreV1(), restorePV, corev1.PersistentVolumeReclaimRetain)
if err != nil {
return errors.Wrapf(err, "error to retain PV %s", restorePV.Name)
}
curLog.WithField("restore PV", restorePV.Name).WithField("retained", (retained != nil)).Info("Restore PV is retained")
defer func() {
if retained != nil {
curLog.WithField("retained PV", retained.Name).Info("Deleting retained PV on error")
kube.DeletePVIfAny(ctx, e.kubeClient.CoreV1(), retained.Name, curLog)
}
}()
if retained != nil {
restorePV = retained
}
err = kube.EnsureDeletePod(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, timeout)
if err != nil {
return errors.Wrapf(err, "error to delete restore pod %s", restorePodName)
}
err = kube.EnsureDeletePVC(ctx, e.kubeClient.CoreV1(), restorePVCName, ownerObject.Namespace, timeout)
if err != nil {
return errors.Wrapf(err, "error to delete restore PVC %s", restorePVCName)
}
curLog.WithField("restore PVC", restorePVCName).Info("Restore PVC is deleted")
_, err = kube.RebindPVC(ctx, e.kubeClient.CoreV1(), targetPVC, restorePV.Name)
if err != nil {
return errors.Wrapf(err, "error to rebind target PVC %s/%s to %s", targetPVC.Namespace, targetPVC.Name, restorePV.Name)
}
curLog.WithField("tartet PVC", fmt.Sprintf("%s/%s", targetPVC.Namespace, targetPVC.Name)).WithField("restore PV", restorePV.Name).Info("Target PVC is rebound to restore PV")
var matchLabel map[string]string
if targetPVC.Spec.Selector != nil {
matchLabel = targetPVC.Spec.Selector.MatchLabels
}
restorePVName := restorePV.Name
restorePV, err = kube.ResetPVBinding(ctx, e.kubeClient.CoreV1(), restorePV, matchLabel)
if err != nil {
return errors.Wrapf(err, "error to reset binding info for restore PV %s", restorePVName)
}
curLog.WithField("restore PV", restorePV.Name).Info("Restore PV is rebound")
restorePV, err = kube.WaitPVBound(ctx, e.kubeClient.CoreV1(), restorePV.Name, targetPVC.Name, targetPVC.Namespace, timeout)
if err != nil {
return errors.Wrapf(err, "error to wait restore PV bound, restore PV %s", restorePVName)
}
curLog.WithField("restore PV", restorePV.Name).Info("Restore PV is ready")
retained = nil
_, err = kube.SetPVReclaimPolicy(ctx, e.kubeClient.CoreV1(), restorePV, orgReclaim)
if err != nil {
curLog.WithField("restore PV", restorePV.Name).WithError(err).Warn("Restore PV's reclaim policy is not restored")
} else {
curLog.WithField("restore PV", restorePV.Name).Info("Restore PV's reclaim policy is restored")
}
return nil
}
func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, label map[string]string, selectedNode string) (*corev1.Pod, error) {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name
var gracePeriod int64 = 0
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: restorePodName,
Namespace: ownerObject.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: ownerObject.APIVersion,
Kind: ownerObject.Kind,
Name: ownerObject.Name,
UID: ownerObject.UID,
Controller: boolptr.True(),
},
},
Labels: label,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: restorePodName,
Image: "alpine:latest",
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{"sleep", "infinity"},
VolumeMounts: []corev1.VolumeMount{{
Name: restorePVCName,
MountPath: "/" + restorePVCName,
}},
},
},
TerminationGracePeriodSeconds: &gracePeriod,
Volumes: []corev1.Volume{{
Name: restorePVCName,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: restorePVCName,
},
},
}},
NodeName: selectedNode,
},
}
return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}
func (e *genericRestoreExposer) createRestorePVC(ctx context.Context, ownerObject corev1.ObjectReference, targetPVC *corev1.PersistentVolumeClaim, selectedNode string) (*corev1.PersistentVolumeClaim, error) {
restorePVCName := ownerObject.Name
pvcObj := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: ownerObject.Namespace,
Name: restorePVCName,
Labels: targetPVC.Labels,
Annotations: targetPVC.Annotations,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: ownerObject.APIVersion,
Kind: ownerObject.Kind,
Name: ownerObject.Name,
UID: ownerObject.UID,
Controller: boolptr.True(),
},
},
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: targetPVC.Spec.AccessModes,
StorageClassName: targetPVC.Spec.StorageClassName,
VolumeMode: targetPVC.Spec.VolumeMode,
Resources: targetPVC.Spec.Resources,
},
}
if selectedNode != "" {
pvcObj.Annotations = map[string]string{
kube.KubeAnnSelectedNode: selectedNode,
}
}
return e.kubeClient.CoreV1().PersistentVolumeClaims(pvcObj.Namespace).Create(ctx, pvcObj, metav1.CreateOptions{})
}

View File

@ -0,0 +1,376 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exposer
import (
"context"
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
corev1api "k8s.io/api/core/v1"
clientTesting "k8s.io/client-go/testing"
)
func TestRestoreExpose(t *testing.T) {
restore := &velerov1.Restore{
TypeMeta: metav1.TypeMeta{
APIVersion: velerov1.SchemeGroupVersion.String(),
Kind: "Restore",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-restore",
UID: "fake-uid",
},
}
targetPVCObj := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "fake-target-pvc",
},
}
tests := []struct {
name string
kubeClientObj []runtime.Object
ownerRestore *velerov1.Restore
targetPVCName string
sourceNamespace string
kubeReactors []reactor
err string
}{
{
name: "wait target pvc consumed fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
err: "error to wait target PVC consumed, fake-ns/fake-target-pvc: error to wait for PVC: error to get pvc fake-ns/fake-target-pvc: persistentvolumeclaims \"fake-target-pvc\" not found",
},
{
name: "create restore pod fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
targetPVCObj,
},
kubeReactors: []reactor{
{
verb: "create",
resource: "pods",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-create-error")
},
},
},
err: "error to create restore pod: fake-create-error",
},
{
name: "create restore pvc fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
targetPVCObj,
},
kubeReactors: []reactor{
{
verb: "create",
resource: "persistentvolumeclaims",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-create-error")
},
},
},
err: "error to create restore pvc: fake-create-error",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
for _, reactor := range test.kubeReactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
}
exposer := genericRestoreExposer{
kubeClient: fakeKubeClient,
log: velerotest.NewLogger(),
}
var ownerObject corev1api.ObjectReference
if test.ownerRestore != nil {
ownerObject = corev1api.ObjectReference{
Kind: test.ownerRestore.Kind,
Namespace: test.ownerRestore.Namespace,
Name: test.ownerRestore.Name,
UID: test.ownerRestore.UID,
APIVersion: test.ownerRestore.APIVersion,
}
}
err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, time.Millisecond)
assert.EqualError(t, err, test.err)
})
}
}
func TestRebindVolume(t *testing.T) {
restore := &velerov1.Restore{
TypeMeta: metav1.TypeMeta{
APIVersion: velerov1.SchemeGroupVersion.String(),
Kind: "Restore",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-restore",
UID: "fake-uid",
},
}
targetPVCObj := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "fake-target-pvc",
},
}
restorePVCObj := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-restore",
},
Spec: corev1api.PersistentVolumeClaimSpec{
VolumeName: "fake-restore-pv",
},
}
restorePVObj := &corev1api.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-restore-pv",
},
Spec: corev1api.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: corev1api.PersistentVolumeReclaimDelete,
},
}
restorePod := &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-restore",
},
}
hookCount := 0
tests := []struct {
name string
kubeClientObj []runtime.Object
ownerRestore *velerov1.Restore
targetPVCName string
sourceNamespace string
kubeReactors []reactor
err string
}{
{
name: "get target pvc fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
err: "error to get target PVC fake-ns/fake-target-pvc: persistentvolumeclaims \"fake-target-pvc\" not found",
},
{
name: "wait restore pvc bound fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
targetPVCObj,
},
err: "error to get PV from restore PVC fake-restore: error to wait for rediness of PVC: error to get pvc velero/fake-restore: persistentvolumeclaims \"fake-restore\" not found",
},
{
name: "retain target pv fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
targetPVCObj,
restorePVCObj,
restorePVObj,
},
kubeReactors: []reactor{
{
verb: "patch",
resource: "persistentvolumes",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-patch-error")
},
},
},
err: "error to retain PV fake-restore-pv: error patching PV: fake-patch-error",
},
{
name: "delete restore pod fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
targetPVCObj,
restorePVCObj,
restorePVObj,
restorePod,
},
kubeReactors: []reactor{
{
verb: "delete",
resource: "pods",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-delete-error")
},
},
},
err: "error to delete restore pod fake-restore: error to delete pod fake-restore: fake-delete-error",
},
{
name: "delete restore pvc fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
targetPVCObj,
restorePVCObj,
restorePVObj,
restorePod,
},
kubeReactors: []reactor{
{
verb: "delete",
resource: "persistentvolumeclaims",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-delete-error")
},
},
},
err: "error to delete restore PVC fake-restore: error to delete pvc fake-restore: fake-delete-error",
},
{
name: "rebind target pvc fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
targetPVCObj,
restorePVCObj,
restorePVObj,
restorePod,
},
kubeReactors: []reactor{
{
verb: "patch",
resource: "persistentvolumeclaims",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-patch-error")
},
},
},
err: "error to rebind target PVC fake-ns/fake-target-pvc to fake-restore-pv: error patching PVC: fake-patch-error",
},
{
name: "reset pv binding fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
targetPVCObj,
restorePVCObj,
restorePVObj,
restorePod,
},
kubeReactors: []reactor{
{
verb: "patch",
resource: "persistentvolumes",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
if hookCount == 0 {
hookCount++
return false, nil, nil
} else {
return true, nil, errors.New("fake-patch-error")
}
},
},
},
err: "error to reset binding info for restore PV fake-restore-pv: error patching PV: fake-patch-error",
},
{
name: "wait restore PV bound fail",
targetPVCName: "fake-target-pvc",
sourceNamespace: "fake-ns",
ownerRestore: restore,
kubeClientObj: []runtime.Object{
targetPVCObj,
restorePVCObj,
restorePVObj,
restorePod,
},
err: "error to wait restore PV bound, restore PV fake-restore-pv: error to wait for bound of PV: timed out waiting for the condition",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
for _, reactor := range test.kubeReactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
}
exposer := genericRestoreExposer{
kubeClient: fakeKubeClient,
log: velerotest.NewLogger(),
}
var ownerObject corev1api.ObjectReference
if test.ownerRestore != nil {
ownerObject = corev1api.ObjectReference{
Kind: test.ownerRestore.Kind,
Namespace: test.ownerRestore.Namespace,
Name: test.ownerRestore.Name,
UID: test.ownerRestore.UID,
APIVersion: test.ownerRestore.APIVersion,
}
}
hookCount = 0
err := exposer.RebindVolume(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, time.Millisecond)
assert.EqualError(t, err, test.err)
})
}
}

View File

@ -703,6 +703,16 @@ func TestGetDeleteItemActions(t *testing.T) {
name: "No items",
names: []string{},
},
{
name: "Error getting restartable process",
names: []string{"velero.io/a", "velero.io/b", "velero.io/c"},
newRestartableProcessError: errors.Errorf("NewRestartableProcess"),
expectedError: "NewRestartableProcess",
},
{
name: "Happy path",
names: []string{"velero.io/a", "velero.io/b", "velero.io/c"},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
@ -739,9 +749,9 @@ func TestGetDeleteItemActions(t *testing.T) {
restartableProcess := &restartabletest.MockRestartableProcess{}
defer restartableProcess.AssertExpectations(t)
expected := &riav1cli.RestartableRestoreItemAction{
Key: process.KindAndName{Kind: pluginKind, Name: pluginName},
SharedPluginProcess: restartableProcess,
expected := &restartableDeleteItemAction{
key: process.KindAndName{Kind: pluginKind, Name: pluginName},
sharedPluginProcess: restartableProcess,
}
if tc.newRestartableProcessError != nil {

View File

@ -25,7 +25,7 @@ import (
proto "github.com/vmware-tanzu/velero/pkg/plugin/generated"
)
// RestoreItemActionPlugin is an implementation of go-plugin's Plugin
// DeleteItemActionPlugin is an implementation of go-plugin's Plugin
// interface with support for gRPC for the restore/ItemAction
// interface.
type DeleteItemActionPlugin struct {
@ -33,7 +33,7 @@ type DeleteItemActionPlugin struct {
*common.PluginBase
}
// GRPCClient returns a RestoreItemAction gRPC client.
// GRPCClient returns a DeleteItemAction gRPC client.
func (p *DeleteItemActionPlugin) GRPCClient(_ context.Context, _ *plugin.GRPCBroker, clientConn *grpc.ClientConn) (interface{}, error) {
return common.NewClientDispenser(p.ClientLogger, clientConn, newDeleteItemActionGRPCClient), nil
}

View File

@ -37,7 +37,7 @@ func NewDeleteItemActionPlugin(options ...common.PluginOption) *DeleteItemAction
}
}
// DeleteItemActionGRPCClient implements the backup/ItemAction interface and uses a
// DeleteItemActionGRPCClient implements the DeleteItemAction interface and uses a
// gRPC client to make calls to the plugin server.
type DeleteItemActionGRPCClient struct {
*common.ClientBase

View File

@ -460,7 +460,15 @@ func TestRestorePodVolumes(t *testing.T) {
assert.Equal(t, test.errs[i].err, errMsg)
} else {
assert.EqualError(t, errs[i], test.errs[i].err)
for i := 0; i < len(errs); i++ {
j := 0
for ; j < len(test.errs); j++ {
if errs[i].Error() == test.errs[j].err {
break
}
}
assert.Equal(t, true, j < len(test.errs))
}
}
}
}

View File

@ -153,19 +153,19 @@ func (a *ChangeImageNameAction) replaceImageName(obj *unstructured.Unstructured,
needUpdateObj := false
containers, _, err := unstructured.NestedSlice(obj.UnstructuredContent(), filed...)
if err != nil {
a.logger.Infof("UnstructuredConverter meet error: %v", err)
log.Infof("UnstructuredConverter meet error: %v", err)
return errors.Wrap(err, "error getting item's spec.containers")
}
if len(containers) == 0 {
return nil
}
for i, container := range containers {
a.logger.Infoln("container:", container)
log.Infoln("container:", container)
if image, ok := container.(map[string]interface{})["image"]; ok {
imageName := image.(string)
if exists, newImageName, err := a.isImageReplaceRuleExist(log, imageName, config); exists && err == nil {
needUpdateObj = true
a.logger.Infof("Updating item's image from %s to %s", imageName, newImageName)
log.Infof("Updating item's image from %s to %s", imageName, newImageName)
container.(map[string]interface{})["image"] = newImageName
containers[i] = container
}

View File

@ -0,0 +1,96 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kopia
import (
"testing"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/vmware-tanzu/velero/pkg/uploader"
)
type fakeProgressUpdater struct{}
func (f *fakeProgressUpdater) UpdateProgress(p *uploader.Progress) {}
func TestThrottle_ShouldOutput(t *testing.T) {
testCases := []struct {
interval time.Duration
throttle int64
expectedOutput bool
}{
{interval: time.Second, expectedOutput: true},
{interval: time.Second, throttle: time.Now().UnixNano() + int64(time.Nanosecond*10000), expectedOutput: false},
}
p := new(Progress)
for _, tc := range testCases {
// Setup
p.InitThrottle(tc.interval)
p.outputThrottle.throttle = int64(tc.throttle)
// Perform the test
output := p.outputThrottle.ShouldOutput()
// Verify the result
if output != tc.expectedOutput {
t.Errorf("Expected ShouldOutput to return %v, but got %v", tc.expectedOutput, output)
}
}
}
func TestProgress(t *testing.T) {
fileName := "test-filename"
var numBytes int64 = 1
testCases := []struct {
interval time.Duration
throttle int64
}{
{interval: time.Second},
{interval: time.Second, throttle: time.Now().UnixNano() + int64(time.Nanosecond*10000)},
}
p := new(Progress)
p.Log = logrus.New()
p.Updater = &fakeProgressUpdater{}
for _, tc := range testCases {
// Setup
p.InitThrottle(tc.interval)
p.outputThrottle.throttle = int64(tc.throttle)
p.InitThrottle(time.Duration(time.Second))
// All below calls put together for the implementation are empty or just very simple and just want to cover testing
// If wanting to write unit tests for some functions could remove it and with writing new function alone
p.UpdateProgress()
p.UploadedBytes(numBytes)
p.Error("test-path", nil, true)
p.Error("test-path", errors.New("processing error"), false)
p.UploadStarted()
p.EstimatedDataSize(1, numBytes)
p.CachedFile(fileName, numBytes)
p.HashedBytes(numBytes)
p.HashingFile(fileName)
p.ExcludedFile(fileName, numBytes)
p.ExcludedDir(fileName)
p.FinishedHashingFile(fileName, numBytes)
p.StartedDirectory(fileName)
p.FinishedDirectory(fileName)
p.UploadFinished()
p.ProgressBytes(numBytes, numBytes)
p.FinishedFile(fileName, nil)
}
}

View File

@ -0,0 +1,204 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kopia
import (
"context"
"errors"
"testing"
"time"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/repo/object"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo/mocks"
)
func TestShimRepo(t *testing.T) {
ctx := context.Background()
backupRepo := &mocks.BackupRepo{}
backupRepo.On("Time").Return(time.Time{})
shim := NewShimRepo(backupRepo)
// All below calls put together for the implementation are empty or just very simple, and just want to cover testing
// If wanting to write unit tests for some functions could remove it and with writing new function alone
shim.VerifyObject(ctx, object.ID{})
shim.Time()
shim.ClientOptions()
shim.Refresh(ctx)
shim.ContentInfo(ctx, content.ID{})
shim.PrefetchContents(ctx, []content.ID{}, "hint")
shim.PrefetchObjects(ctx, []object.ID{}, "hint")
shim.UpdateDescription("desc")
shim.NewWriter(ctx, repo.WriteSessionOptions{})
shim.ReplaceManifests(ctx, map[string]string{}, nil)
shim.OnSuccessfulFlush(func(ctx context.Context, w repo.RepositoryWriter) error { return nil })
backupRepo.On("Close", mock.Anything).Return(nil)
NewShimRepo(backupRepo).Close(ctx)
var id udmrepo.ID
backupRepo.On("PutManifest", mock.Anything, mock.Anything).Return(id, nil)
NewShimRepo(backupRepo).PutManifest(ctx, map[string]string{}, nil)
var mf manifest.ID
backupRepo.On("DeleteManifest", mock.Anything, mock.Anything).Return(nil)
NewShimRepo(backupRepo).DeleteManifest(ctx, mf)
backupRepo.On("Flush", mock.Anything).Return(nil)
NewShimRepo(backupRepo).Flush(ctx)
var objID object.ID
backupRepo.On("ConcatenateObjects", mock.Anything, mock.Anything).Return(objID)
NewShimRepo(backupRepo).ConcatenateObjects(ctx, []object.ID{})
backupRepo.On("NewObjectWriter", mock.Anything, mock.Anything).Return(nil)
NewShimRepo(backupRepo).NewObjectWriter(ctx, object.WriterOptions{})
}
func TestOpenObject(t *testing.T) {
tests := []struct {
name string
backupRepo *mocks.BackupRepo
isOpenObjectError bool
isReaderNil bool
}{
{
name: "Success",
backupRepo: func() *mocks.BackupRepo {
backupRepo := &mocks.BackupRepo{}
backupRepo.On("OpenObject", mock.Anything, mock.Anything).Return(&shimObjectReader{}, nil)
return backupRepo
}(),
},
{
name: "Open object error",
backupRepo: func() *mocks.BackupRepo {
backupRepo := &mocks.BackupRepo{}
backupRepo.On("OpenObject", mock.Anything, mock.Anything).Return(&shimObjectReader{}, errors.New("Error open object"))
return backupRepo
}(),
isOpenObjectError: true,
},
{
name: "Get nil reader",
backupRepo: func() *mocks.BackupRepo {
backupRepo := &mocks.BackupRepo{}
backupRepo.On("OpenObject", mock.Anything, mock.Anything).Return(nil, nil)
return backupRepo
}(),
isReaderNil: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
reader, err := NewShimRepo(tc.backupRepo).OpenObject(ctx, object.ID{})
if tc.isOpenObjectError {
assert.Contains(t, err.Error(), "failed to open object")
} else if tc.isReaderNil {
assert.Nil(t, reader)
} else {
assert.NotNil(t, reader)
assert.Nil(t, err)
}
})
}
}
func TestFindManifests(t *testing.T) {
meta := []*udmrepo.ManifestEntryMetadata{}
tests := []struct {
name string
backupRepo *mocks.BackupRepo
isGetManifestError bool
}{
{
name: "Success",
backupRepo: func() *mocks.BackupRepo {
backupRepo := &mocks.BackupRepo{}
backupRepo.On("FindManifests", mock.Anything, mock.Anything).Return(meta, nil)
return backupRepo
}(),
},
{
name: "Failed to find manifest",
isGetManifestError: true,
backupRepo: func() *mocks.BackupRepo {
backupRepo := &mocks.BackupRepo{}
backupRepo.On("FindManifests", mock.Anything, mock.Anything).Return(meta,
errors.New("failed to find manifest"))
return backupRepo
}(),
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
_, err := NewShimRepo(tc.backupRepo).FindManifests(ctx, map[string]string{})
if tc.isGetManifestError {
assert.Contains(t, err.Error(), "failed")
} else {
assert.Nil(t, err)
}
})
}
}
func TestShimObjReader(t *testing.T) {
reader := new(shimObjectReader)
objReader := &mocks.ObjectReader{}
reader.repoReader = objReader
// All below calls put together for the implementation are empty or just very simple, and just want to cover testing
// If wanting to write unit tests for some functions could remove it and with writing new function alone
objReader.On("Seek", mock.Anything, mock.Anything).Return(int64(0), nil)
reader.Seek(int64(0), 0)
objReader.On("Read", mock.Anything).Return(0, nil)
reader.Read(nil)
objReader.On("Close").Return(nil)
reader.Close()
objReader.On("Length").Return(int64(0))
reader.Length()
}
func TestShimObjWriter(t *testing.T) {
writer := new(shimObjectWriter)
objWriter := &mocks.ObjectWriter{}
writer.repoWriter = objWriter
// All below calls put together for the implementation are empty or just very simple, and just want to cover testing
// If wanting to write unit tests for some functions could remove it and with writing new function alone
var id udmrepo.ID
objWriter.On("Checkpoint").Return(id, nil)
writer.Checkpoint()
objWriter.On("Result").Return(id, nil)
writer.Result()
objWriter.On("Write", mock.Anything).Return(0, nil)
writer.Write(nil)
objWriter.On("Close").Return(nil)
writer.Close()
}

View File

@ -47,6 +47,9 @@ import (
var applyRetentionPolicyFunc = policy.ApplyRetentionPolicy
var saveSnapshotFunc = snapshot.SaveSnapshot
var loadSnapshotFunc = snapshot.LoadSnapshot
var listSnapshotsFunc = snapshot.ListSnapshots
var filesystemEntryFunc = snapshotfs.FilesystemEntryFromIDWithPath
var restoreEntryFunc = restore.Entry
// SnapshotUploader which mainly used for UT test that could overwrite Upload interface
type SnapshotUploader interface {
@ -84,7 +87,7 @@ func setupDefaultPolicy() *policy.Tree {
}
// Backup backup specific sourcePath and update progress
func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string,
func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string,
forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
if fsUploader == nil {
return nil, false, errors.New("get empty kopia uploader")
@ -102,12 +105,18 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep
return nil, true, nil
}
dir = filepath.Clean(dir)
sourceInfo := snapshot.SourceInfo{
UserName: udmrepo.GetRepoUser(),
Host: udmrepo.GetRepoDomain(),
Path: filepath.Clean(dir),
Path: filepath.Clean(realSource),
}
rootDir, err := getLocalFSEntry(sourceInfo.Path)
if sourceInfo.Path == "" {
sourceInfo.Path = dir
}
rootDir, err := getLocalFSEntry(dir)
if err != nil {
return nil, false, errors.Wrap(err, "Unable to get local filesystem entry")
}
@ -173,6 +182,8 @@ func SnapshotSource(
var previous []*snapshot.Manifest
if !forceFull {
if parentSnapshot != "" {
log.Infof("Using provided parent snapshot %s", parentSnapshot)
mani, err := loadSnapshotFunc(ctx, rep, manifest.ID(parentSnapshot))
if err != nil {
return "", 0, errors.Wrapf(err, "Failed to load previous snapshot %v from kopia", parentSnapshot)
@ -180,13 +191,21 @@ func SnapshotSource(
previous = append(previous, mani)
} else {
pre, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, snapshotTags, nil)
log.Infof("Searching for parent snapshot")
pre, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, snapshotTags, nil, log)
if err != nil {
return "", 0, errors.Wrapf(err, "Failed to find previous kopia snapshot manifests for si %v", sourceInfo)
}
previous = pre
}
} else {
log.Info("Forcing full snapshot")
}
for i := range previous {
log.Infof("Using parent snapshot %s, start time %v, end time %v, description %s", previous[i].ID, previous[i].StartTime.ToTime(), previous[i].EndTime.ToTime(), previous[i].Description)
}
policyTree := setupDefaultPolicy()
@ -237,8 +256,8 @@ func reportSnapshotStatus(manifest *snapshot.Manifest, policyTree *policy.Tree)
// findPreviousSnapshotManifest returns the list of previous snapshots for a given source, including
// last complete snapshot following it.
func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, snapshotTags map[string]string, noLaterThan *fs.UTCTimestamp) ([]*snapshot.Manifest, error) {
man, err := snapshot.ListSnapshots(ctx, rep, sourceInfo)
func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, snapshotTags map[string]string, noLaterThan *fs.UTCTimestamp, log logrus.FieldLogger) ([]*snapshot.Manifest, error) {
man, err := listSnapshotsFunc(ctx, rep, sourceInfo)
if err != nil {
return nil, err
}
@ -247,6 +266,8 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour
var result []*snapshot.Manifest
for _, p := range man {
log.Debugf("Found one snapshot %s, start time %v, incomplete %s, tags %v", p.ID, p.StartTime.ToTime(), p.IncompleteReason, p.Tags)
requestor, found := p.Tags[uploader.SnapshotRequestorTag]
if !found {
continue
@ -294,7 +315,7 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
log.Infof("Restore from snapshot %s, description %s, created time %v, tags %v", snapshotID, snapshot.Description, snapshot.EndTime.ToTime(), snapshot.Tags)
rootEntry, err := snapshotfs.FilesystemEntryFromIDWithPath(kopiaCtx, rep, snapshotID, false)
rootEntry, err := filesystemEntryFunc(kopiaCtx, rep, snapshotID, false)
if err != nil {
return 0, 0, errors.Wrapf(err, "Unable to get filesystem entry for snapshot %v", snapshotID)
}
@ -317,7 +338,7 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
return 0, 0, errors.Wrap(err, "error to init output")
}
stat, err := restore.Entry(kopiaCtx, rep, output, rootEntry, restore.Options{
stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{
Parallel: runtime.NumCPU(),
RestoreDirEntryAtDepth: math.MaxInt32,
Cancel: cancleCh,

View File

@ -18,15 +18,23 @@ package kopia
import (
"context"
"strings"
"testing"
"time"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/restore"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
repomocks "github.com/vmware-tanzu/velero/pkg/repository/mocks"
"github.com/vmware-tanzu/velero/pkg/uploader"
uploadermocks "github.com/vmware-tanzu/velero/pkg/uploader/mocks"
)
@ -179,5 +187,545 @@ func TestSnapshotSource(t *testing.T) {
}
})
}
}
func TestReportSnapshotStatus(t *testing.T) {
testCases := []struct {
shouldError bool
expectedResult string
expectedSize int64
directorySummary *fs.DirectorySummary
expectedErrors []string
}{
{
shouldError: false,
expectedResult: "sample-manifest-id",
expectedSize: 1024,
directorySummary: &fs.DirectorySummary{
TotalFileSize: 1024,
},
},
{
shouldError: true,
expectedResult: "",
expectedSize: 0,
directorySummary: &fs.DirectorySummary{
FailedEntries: []*fs.EntryWithError{
{
EntryPath: "/path/to/file.txt",
Error: "Unknown file error",
},
},
},
expectedErrors: []string{"Error when processing /path/to/file.txt: Unknown file error"},
},
}
for _, tc := range testCases {
manifest := &snapshot.Manifest{
ID: manifest.ID("sample-manifest-id"),
Stats: snapshot.Stats{
TotalFileSize: 1024,
},
RootEntry: &snapshot.DirEntry{
DirSummary: tc.directorySummary,
},
}
result, size, err := reportSnapshotStatus(manifest, setupDefaultPolicy())
switch {
case tc.shouldError && err == nil:
t.Errorf("expected error, but got nil")
case !tc.shouldError && err != nil:
t.Errorf("unexpected error: %v", err)
case tc.shouldError && err != nil:
expectedErr := strings.Join(tc.expectedErrors, "\n")
if err.Error() != expectedErr {
t.Errorf("unexpected error: got %v, want %v", err, expectedErr)
}
}
if result != tc.expectedResult {
t.Errorf("unexpected result: got %v, want %v", result, tc.expectedResult)
}
if size != tc.expectedSize {
t.Errorf("unexpected size: got %v, want %v", size, tc.expectedSize)
}
}
}
func TestFindPreviousSnapshotManifest(t *testing.T) {
// Prepare test data
sourceInfo := snapshot.SourceInfo{
UserName: "user1",
Host: "host1",
Path: "/path/to/dir1",
}
snapshotTags := map[string]string{
uploader.SnapshotRequestorTag: "user1",
uploader.SnapshotUploaderTag: "uploader1",
}
noLaterThan := fs.UTCTimestampFromTime(time.Now())
testCases := []struct {
name string
listSnapshotsFunc func(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) ([]*snapshot.Manifest, error)
expectedSnapshots []*snapshot.Manifest
expectedError error
}{
// No matching snapshots
{
name: "No matching snapshots",
listSnapshotsFunc: func(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) ([]*snapshot.Manifest, error) {
return []*snapshot.Manifest{}, nil
},
expectedSnapshots: []*snapshot.Manifest{},
expectedError: nil,
},
{
name: "Error getting manifest",
listSnapshotsFunc: func(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) ([]*snapshot.Manifest, error) {
return []*snapshot.Manifest{}, errors.New("Error getting manifest")
},
expectedSnapshots: []*snapshot.Manifest{},
expectedError: errors.New("Error getting manifest"),
},
// Only one matching snapshot
{
name: "One matching snapshot",
listSnapshotsFunc: func(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) ([]*snapshot.Manifest, error) {
return []*snapshot.Manifest{
{
Tags: map[string]string{
uploader.SnapshotRequestorTag: "user1",
uploader.SnapshotUploaderTag: "uploader1",
"otherTag": "value",
"anotherCustomTag": "123",
"snapshotRequestor": "user1",
"snapshotUploader": "uploader1",
},
},
}, nil
},
expectedSnapshots: []*snapshot.Manifest{
{
Tags: map[string]string{
uploader.SnapshotRequestorTag: "user1",
uploader.SnapshotUploaderTag: "uploader1",
"otherTag": "value",
"anotherCustomTag": "123",
"snapshotRequestor": "user1",
"snapshotUploader": "uploader1",
},
},
},
expectedError: nil,
},
// Multiple matching snapshots
{
name: "Multiple matching snapshots",
listSnapshotsFunc: func(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) ([]*snapshot.Manifest, error) {
return []*snapshot.Manifest{
{
Tags: map[string]string{
uploader.SnapshotRequestorTag: "user1",
uploader.SnapshotUploaderTag: "uploader1",
"otherTag": "value1",
"snapshotRequestor": "user1",
"snapshotUploader": "uploader1",
},
},
{
Tags: map[string]string{
uploader.SnapshotRequestorTag: "user1",
uploader.SnapshotUploaderTag: "uploader1",
"otherTag": "value2",
"snapshotRequestor": "user1",
"snapshotUploader": "uploader1",
},
},
}, nil
},
expectedSnapshots: []*snapshot.Manifest{
{
Tags: map[string]string{
uploader.SnapshotRequestorTag: "user1",
uploader.SnapshotUploaderTag: "uploader1",
"otherTag": "value1",
"snapshotRequestor": "user1",
"snapshotUploader": "uploader1",
},
},
},
expectedError: nil,
},
// Snapshot with different requestor
{
name: "Snapshot with different requestor",
listSnapshotsFunc: func(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) ([]*snapshot.Manifest, error) {
return []*snapshot.Manifest{
{
Tags: map[string]string{
uploader.SnapshotRequestorTag: "user2",
uploader.SnapshotUploaderTag: "uploader1",
"otherTag": "value",
"snapshotRequestor": "user2",
"snapshotUploader": "uploader1",
},
},
}, nil
},
expectedSnapshots: []*snapshot.Manifest{},
expectedError: nil,
},
// Snapshot with different uploader
{
name: "Snapshot with different uploader",
listSnapshotsFunc: func(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) ([]*snapshot.Manifest, error) {
return []*snapshot.Manifest{
{
Tags: map[string]string{
uploader.SnapshotRequestorTag: "user1",
uploader.SnapshotUploaderTag: "uploader2",
"otherTag": "value",
"snapshotRequestor": "user1",
"snapshotUploader": "uploader2",
},
},
}, nil
},
expectedSnapshots: []*snapshot.Manifest{},
expectedError: nil,
},
// Snapshot with a later start time
{
name: "Snapshot with a later start time",
listSnapshotsFunc: func(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) ([]*snapshot.Manifest, error) {
return []*snapshot.Manifest{
{
Tags: map[string]string{
uploader.SnapshotRequestorTag: "user1",
uploader.SnapshotUploaderTag: "uploader1",
"otherTag": "value",
"snapshotRequestor": "user1",
"snapshotUploader": "uploader1",
},
StartTime: fs.UTCTimestampFromTime(time.Now().Add(time.Hour)),
},
}, nil
},
expectedSnapshots: []*snapshot.Manifest{},
expectedError: nil,
},
// Snapshot with incomplete reason
{
name: "Snapshot with incomplete reason",
listSnapshotsFunc: func(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) ([]*snapshot.Manifest, error) {
return []*snapshot.Manifest{
{
Tags: map[string]string{
uploader.SnapshotRequestorTag: "user1",
uploader.SnapshotUploaderTag: "uploader1",
"otherTag": "value",
"snapshotRequestor": "user1",
"snapshotUploader": "uploader1",
},
IncompleteReason: "reason",
},
}, nil
},
expectedSnapshots: []*snapshot.Manifest{},
expectedError: nil,
},
// Multiple snapshots with some matching conditions
{
name: "Multiple snapshots with matching conditions",
listSnapshotsFunc: func(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) ([]*snapshot.Manifest, error) {
return []*snapshot.Manifest{
{
Tags: map[string]string{
uploader.SnapshotRequestorTag: "user1",
uploader.SnapshotUploaderTag: "uploader1",
"otherTag": "value1",
"snapshotRequestor": "user1",
"snapshotUploader": "uploader1",
},
},
{
Tags: map[string]string{
uploader.SnapshotRequestorTag: "user1",
uploader.SnapshotUploaderTag: "uploader1",
"otherTag": "value2",
"snapshotRequestor": "user1",
"snapshotUploader": "uploader1",
},
StartTime: fs.UTCTimestampFromTime(time.Now().Add(-time.Hour)),
IncompleteReason: "reason",
},
{
Tags: map[string]string{
uploader.SnapshotRequestorTag: "user1",
uploader.SnapshotUploaderTag: "uploader1",
"otherTag": "value3",
"snapshotRequestor": "user1",
"snapshotUploader": "uploader1",
},
StartTime: fs.UTCTimestampFromTime(time.Now().Add(-time.Hour)),
},
}, nil
},
expectedSnapshots: []*snapshot.Manifest{
{
Tags: map[string]string{
uploader.SnapshotRequestorTag: "user1",
uploader.SnapshotUploaderTag: "uploader1",
"otherTag": "value3",
"snapshotRequestor": "user1",
"snapshotUploader": "uploader1",
},
StartTime: fs.UTCTimestampFromTime(time.Now().Add(-time.Hour)),
},
},
expectedError: nil,
},
// Snapshot with manifest SnapshotRequestorTag not found
{
name: "Snapshot with manifest SnapshotRequestorTag not found",
listSnapshotsFunc: func(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) ([]*snapshot.Manifest, error) {
return []*snapshot.Manifest{
{
Tags: map[string]string{
"requestor": "user1",
uploader.SnapshotUploaderTag: "uploader1",
"otherTag": "value",
"snapshotRequestor": "user1",
"snapshotUploader": "uploader1",
},
IncompleteReason: "reason",
},
}, nil
},
expectedSnapshots: []*snapshot.Manifest{},
expectedError: nil,
},
// Snapshot with manifest SnapshotRequestorTag not found
{
name: "Snapshot with manifest SnapshotUploaderTag not found",
listSnapshotsFunc: func(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) ([]*snapshot.Manifest, error) {
return []*snapshot.Manifest{
{
Tags: map[string]string{
uploader.SnapshotRequestorTag: "user1",
"uploader": "uploader1",
"otherTag": "value",
"snapshotRequestor": "user1",
"snapshotUploader": "uploader1",
},
IncompleteReason: "reason",
},
}, nil
},
expectedSnapshots: []*snapshot.Manifest{},
expectedError: nil,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var repo repo.Repository
listSnapshotsFunc = tc.listSnapshotsFunc
snapshots, err := findPreviousSnapshotManifest(context.Background(), repo, sourceInfo, snapshotTags, &noLaterThan, logrus.New())
// Check if the returned error matches the expected error
if tc.expectedError != nil {
assert.Contains(t, err.Error(), tc.expectedError.Error())
} else {
assert.Nil(t, err)
}
// Check the number of returned snapshots
if len(snapshots) != len(tc.expectedSnapshots) {
t.Errorf("Expected %d snapshots, got %d", len(tc.expectedSnapshots), len(snapshots))
}
})
}
}
func TestBackup(t *testing.T) {
type testCase struct {
name string
sourcePath string
forceFull bool
parentSnapshot string
tags map[string]string
isEmptyUploader bool
isSnapshotSourceError bool
expectedError error
expectedEmpty bool
}
manifest := &snapshot.Manifest{
ID: "test",
RootEntry: &snapshot.DirEntry{},
}
// Define test cases
testCases := []testCase{
{
name: "Successful backup",
sourcePath: "/",
tags: map[string]string{},
expectedError: nil,
},
{
name: "Empty fsUploader",
isEmptyUploader: true,
sourcePath: "/",
tags: nil,
expectedError: errors.New("get empty kopia uploader"),
},
{
name: "Unable to read directory",
sourcePath: "/invalid/path",
tags: nil,
expectedError: errors.New("Unable to read dir"),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
s := injectSnapshotFuncs()
args := []mockArgs{
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}},
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
{methodName: "SetPolicy", returns: []interface{}{nil}},
{methodName: "Upload", returns: []interface{}{manifest, nil}},
{methodName: "Flush", returns: []interface{}{nil}},
}
MockFuncs(s, args)
if tc.isSnapshotSourceError {
s.repoWriterMock.On("FindManifests", mock.Anything, mock.Anything).Return(nil, errors.New("Failed to get manifests"))
s.repoWriterMock.On("Flush", mock.Anything).Return(errors.New("Failed to get manifests"))
} else {
s.repoWriterMock.On("FindManifests", mock.Anything, mock.Anything).Return(nil, nil)
}
var isSnapshotEmpty bool
var snapshotInfo *uploader.SnapshotInfo
var err error
if tc.isEmptyUploader {
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), nil, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.tags, &logrus.Logger{})
} else {
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), s.uploderMock, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.tags, &logrus.Logger{})
}
// Check if the returned error matches the expected error
if tc.expectedError != nil {
assert.Contains(t, err.Error(), tc.expectedError.Error())
} else {
assert.Nil(t, err)
}
assert.Equal(t, tc.expectedEmpty, isSnapshotEmpty)
if err == nil {
assert.NotNil(t, snapshotInfo)
}
})
}
}
func TestRestore(t *testing.T) {
type testCase struct {
name string
snapshotID string
invalidManifestType bool
filesystemEntryFunc func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error)
restoreEntryFunc func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error)
dest string
expectedBytes int64
expectedCount int32
expectedError error
}
// Define test cases
testCases := []testCase{
{
name: "manifest is not a snapshot",
invalidManifestType: true,
dest: "/path/to/destination",
expectedError: errors.New("Unable to load snapshot"),
},
{
name: "Failed to get filesystem entry",
snapshotID: "snapshot-123",
expectedError: errors.New("Unable to get filesystem entry"),
},
{
name: "Failed to restore with filesystem entry",
filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) {
return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil
},
restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) {
return restore.Stats{}, errors.New("Unable to get filesystem entry")
},
snapshotID: "snapshot-123",
expectedError: errors.New("Unable to get filesystem entry"),
},
{
name: "Expect successful",
filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) {
return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil
},
restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) {
return restore.Stats{}, nil
},
snapshotID: "snapshot-123",
expectedError: nil,
},
}
em := &manifest.EntryMetadata{
ID: "test",
Labels: map[string]string{},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.invalidManifestType {
em.Labels[manifest.TypeLabelKey] = ""
} else {
em.Labels[manifest.TypeLabelKey] = snapshot.ManifestType
}
if tc.filesystemEntryFunc != nil {
filesystemEntryFunc = tc.filesystemEntryFunc
}
if tc.restoreEntryFunc != nil {
restoreEntryFunc = tc.restoreEntryFunc
}
repoWriterMock := &repomocks.RepositoryWriter{}
repoWriterMock.On("GetManifest", mock.Anything, mock.Anything, mock.Anything).Return(em, nil)
repoWriterMock.On("OpenObject", mock.Anything, mock.Anything).Return(em, nil)
progress := new(Progress)
bytesRestored, fileCount, err := Restore(context.Background(), repoWriterMock, progress, tc.snapshotID, tc.dest, logrus.New(), nil)
// Check if the returned error matches the expected error
if tc.expectedError != nil {
assert.Contains(t, err.Error(), tc.expectedError.Error())
} else {
assert.Nil(t, err)
}
// Check the number of bytes restored
assert.Equal(t, tc.expectedBytes, bytesRestored)
// Check the number of files restored
assert.Equal(t, tc.expectedCount, fileCount)
})
}
}

View File

@ -39,6 +39,7 @@ import (
// BackupFunc mainly used to make testing more convenient
var BackupFunc = kopia.Backup
var RestoreFunc = kopia.Restore
var BackupRepoServiceCreateFunc = service.Create
// kopiaProvider recorded info related with kopiaProvider
type kopiaProvider struct {
@ -73,7 +74,7 @@ func NewKopiaUploaderProvider(
return nil, errors.Wrapf(err, "error to get repo options")
}
repoSvc := service.Create(log)
repoSvc := BackupRepoServiceCreateFunc(log)
log.WithField("repoUID", repoUID).Info("Opening backup repo")
kp.bkRepo, err = repoSvc.Open(ctx, *repoOpt)
@ -113,6 +114,7 @@ func (kp *kopiaProvider) Close(ctx context.Context) error {
func (kp *kopiaProvider) RunBackup(
ctx context.Context,
path string,
realSource string,
tags map[string]string,
forceFull bool,
parentSnapshot string,
@ -121,8 +123,13 @@ func (kp *kopiaProvider) RunBackup(
return "", false, errors.New("Need to initial backup progress updater first")
}
if path == "" {
return "", false, errors.New("path is empty")
}
log := kp.log.WithFields(logrus.Fields{
"path": path,
"realSource": realSource,
"parentSnapshot": parentSnapshot,
})
repoWriter := kopia.NewShimRepo(kp.bkRepo)
@ -146,7 +153,7 @@ func (kp *kopiaProvider) RunBackup(
tags[uploader.SnapshotRequestorTag] = kp.requestorType
tags[uploader.SnapshotUploaderTag] = uploader.KopiaType
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, forceFull, parentSnapshot, tags, log)
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, tags, log)
if err != nil {
if kpUploader.IsCanceled() {
log.Error("Kopia backup is canceled")

View File

@ -18,48 +18,76 @@ package provider
import (
"context"
"sync"
"testing"
"time"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/internal/credentials/mocks"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
"github.com/vmware-tanzu/velero/pkg/repository"
udmrepo "github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
udmrepomocks "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/mocks"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/uploader/kopia"
)
type FakeBackupProgressUpdater struct {
PodVolumeBackup *velerov1api.PodVolumeBackup
Log logrus.FieldLogger
Ctx context.Context
Cli client.Client
}
func (f *FakeBackupProgressUpdater) UpdateProgress(p *uploader.Progress) {}
type FakeRestoreProgressUpdater struct {
PodVolumeRestore *velerov1api.PodVolumeRestore
Log logrus.FieldLogger
Ctx context.Context
Cli client.Client
}
func (f *FakeRestoreProgressUpdater) UpdateProgress(p *uploader.Progress) {}
func TestRunBackup(t *testing.T) {
var kp kopiaProvider
kp.log = logrus.New()
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()}
testCases := []struct {
name string
hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error)
hookBackupFunc func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error)
notError bool
}{
{
name: "success to backup",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, nil
},
notError: true,
},
{
name: "get error to backup",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, errors.New("failed to backup")
},
notError: false,
},
{
name: "got empty snapshot",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return nil, true, errors.New("snapshot is empty")
},
notError: false,
@ -68,7 +96,7 @@ func TestRunBackup(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
BackupFunc = tc.hookBackupFunc
_, _, err := kp.RunBackup(context.Background(), "var", nil, false, "", &updater)
_, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", &updater)
if tc.notError {
assert.NoError(t, err)
} else {
@ -117,20 +145,227 @@ func TestRunRestore(t *testing.T) {
}
}
type FakeBackupProgressUpdater struct {
PodVolumeBackup *velerov1api.PodVolumeBackup
Log logrus.FieldLogger
Ctx context.Context
Cli client.Client
func TestCheckContext(t *testing.T) {
testCases := []struct {
name string
finishChan chan struct{}
restoreChan chan struct{}
uploader *snapshotfs.Uploader
expectCancel bool
expectBackup bool
expectRestore bool
}{
{
name: "FinishChan",
finishChan: make(chan struct{}),
restoreChan: make(chan struct{}),
uploader: &snapshotfs.Uploader{},
expectCancel: false,
expectBackup: false,
expectRestore: false,
},
{
name: "nil uploader",
finishChan: make(chan struct{}),
restoreChan: make(chan struct{}),
uploader: nil,
expectCancel: true,
expectBackup: false,
expectRestore: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
if tc.expectBackup {
go func() {
wg.Wait()
tc.restoreChan <- struct{}{}
}()
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
wg.Done()
}()
kp := &kopiaProvider{log: logrus.New()}
kp.CheckContext(ctx, tc.finishChan, tc.restoreChan, tc.uploader)
if tc.expectCancel && tc.uploader != nil {
t.Error("Expected the uploader to be cancelled")
}
if tc.expectBackup && tc.uploader == nil && len(tc.restoreChan) > 0 {
t.Error("Expected the restore channel to be closed")
}
})
}
}
func (f *FakeBackupProgressUpdater) UpdateProgress(p *uploader.Progress) {}
func TestGetPassword(t *testing.T) {
testCases := []struct {
name string
empytSecret bool
credGetterFunc func(*mocks.SecretStore, *v1.SecretKeySelector)
expectError bool
expectedPass string
}{
{
name: "valid credentials interface",
credGetterFunc: func(ss *mocks.SecretStore, selector *v1.SecretKeySelector) {
ss.On("Get", selector).Return("test", nil)
},
expectError: false,
expectedPass: "test",
},
{
name: "empty from secret",
empytSecret: true,
expectError: true,
expectedPass: "",
},
{
name: "ErrorGettingPassword",
credGetterFunc: func(ss *mocks.SecretStore, selector *v1.SecretKeySelector) {
ss.On("Get", selector).Return("", errors.New("error getting password"))
},
expectError: true,
expectedPass: "",
},
}
type FakeRestoreProgressUpdater struct {
PodVolumeRestore *velerov1api.PodVolumeRestore
Log logrus.FieldLogger
Ctx context.Context
Cli client.Client
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Mock CredentialGetter
credGetter := &credentials.CredentialGetter{}
mockCredGetter := &mocks.SecretStore{}
if !tc.empytSecret {
credGetter.FromSecret = mockCredGetter
}
repoKeySelector := &v1.SecretKeySelector{LocalObjectReference: v1.LocalObjectReference{Name: "velero-repo-credentials"}, Key: "repository-password"}
if tc.credGetterFunc != nil {
tc.credGetterFunc(mockCredGetter, repoKeySelector)
}
kp := &kopiaProvider{
credGetter: credGetter,
}
password, err := kp.GetPassword(nil)
if tc.expectError {
assert.Error(t, err, "Expected an error")
} else {
assert.NoError(t, err, "Expected no error")
}
assert.Equal(t, tc.expectedPass, password, "Expected password to match")
})
}
}
func (f *FakeRestoreProgressUpdater) UpdateProgress(p *uploader.Progress) {}
func (m *MockCredentialGetter) GetCredentials() (string, error) {
args := m.Called()
return args.String(0), args.Error(1)
}
// MockRepoSvc is a mock implementation of the RepoService interface.
type MockRepoSvc struct {
mock.Mock
}
func (m *MockRepoSvc) Open(ctx context.Context, opts udmrepo.RepoOptions) (udmrepo.BackupRepo, error) {
args := m.Called(ctx, opts)
return args.Get(0).(udmrepo.BackupRepo), args.Error(1)
}
func TestNewKopiaUploaderProvider(t *testing.T) {
requestorType := "testRequestor"
ctx := context.Background()
backupRepo := repository.NewBackupRepository(velerov1api.DefaultNamespace, repository.BackupRepositoryKey{VolumeNamespace: "fake-volume-ns-02", BackupLocation: "fake-bsl-02", RepositoryType: "fake-repository-type-02"})
mockLog := logrus.New()
// Define test cases
testCases := []struct {
name string
mockCredGetter *mocks.SecretStore
mockBackupRepoService udmrepo.BackupRepoService
expectedError string
}{
{
name: "Success",
mockCredGetter: func() *mocks.SecretStore {
mockCredGetter := &mocks.SecretStore{}
mockCredGetter.On("Get", mock.Anything).Return("test", nil)
return mockCredGetter
}(),
mockBackupRepoService: func() udmrepo.BackupRepoService {
backupRepoService := &udmrepomocks.BackupRepoService{}
var backupRepo udmrepo.BackupRepo
backupRepoService.On("Open", context.Background(), mock.Anything).Return(backupRepo, nil)
return backupRepoService
}(),
expectedError: "",
},
{
name: "Error to get repo options",
mockCredGetter: func() *mocks.SecretStore {
mockCredGetter := &mocks.SecretStore{}
mockCredGetter.On("Get", mock.Anything).Return("test", errors.New("failed to get password"))
return mockCredGetter
}(),
mockBackupRepoService: func() udmrepo.BackupRepoService {
backupRepoService := &udmrepomocks.BackupRepoService{}
var backupRepo udmrepo.BackupRepo
backupRepoService.On("Open", context.Background(), mock.Anything).Return(backupRepo, nil)
return backupRepoService
}(),
expectedError: "error to get repo options",
},
{
name: "Error open repository service",
mockCredGetter: func() *mocks.SecretStore {
mockCredGetter := &mocks.SecretStore{}
mockCredGetter.On("Get", mock.Anything).Return("test", nil)
return mockCredGetter
}(),
mockBackupRepoService: func() udmrepo.BackupRepoService {
backupRepoService := &udmrepomocks.BackupRepoService{}
var backupRepo udmrepo.BackupRepo
backupRepoService.On("Open", context.Background(), mock.Anything).Return(backupRepo, errors.New("failed to init repository"))
return backupRepoService
}(),
expectedError: "Failed to find kopia repository",
},
}
// Iterate through test cases
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
credGetter := &credentials.CredentialGetter{FromSecret: tc.mockCredGetter}
BackupRepoServiceCreateFunc = func(logger logrus.FieldLogger) udmrepo.BackupRepoService {
return tc.mockBackupRepoService
}
// Call the function being tested.
_, err := NewKopiaUploaderProvider(requestorType, ctx, credGetter, backupRepo, mockLog)
// Assertions
if tc.expectedError != "" {
assert.Contains(t, err.Error(), tc.expectedError)
} else {
assert.Nil(t, err)
}
// Verify that the expected methods were called on the mocks.
tc.mockCredGetter.AssertExpectations(t)
})
}
}

View File

@ -29,30 +29,30 @@ func (_m *Provider) Close(ctx context.Context) error {
return r0
}
// RunBackup provides a mock function with given fields: ctx, path, tags, forceFull, parentSnapshot, updater
func (_m *Provider) RunBackup(ctx context.Context, path string, tags map[string]string, forceFull bool, parentSnapshot string, updater uploader.ProgressUpdater) (string, bool, error) {
ret := _m.Called(ctx, path, tags, forceFull, parentSnapshot, updater)
// RunBackup provides a mock function with given fields: ctx, path, realSource, tags, forceFull, parentSnapshot, updater
func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, updater uploader.ProgressUpdater) (string, bool, error) {
ret := _m.Called(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
var r0 string
var r1 bool
var r2 error
if rf, ok := ret.Get(0).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) (string, bool, error)); ok {
return rf(ctx, path, tags, forceFull, parentSnapshot, updater)
if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) (string, bool, error)); ok {
return rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
}
if rf, ok := ret.Get(0).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) string); ok {
r0 = rf(ctx, path, tags, forceFull, parentSnapshot, updater)
if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) string); ok {
r0 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
} else {
r0 = ret.Get(0).(string)
}
if rf, ok := ret.Get(1).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) bool); ok {
r1 = rf(ctx, path, tags, forceFull, parentSnapshot, updater)
if rf, ok := ret.Get(1).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) bool); ok {
r1 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
} else {
r1 = ret.Get(1).(bool)
}
if rf, ok := ret.Get(2).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) error); ok {
r2 = rf(ctx, path, tags, forceFull, parentSnapshot, updater)
if rf, ok := ret.Get(2).(func(context.Context, string, string, map[string]string, bool, string, uploader.ProgressUpdater) error); ok {
r2 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
} else {
r2 = ret.Error(2)
}

View File

@ -44,6 +44,7 @@ type Provider interface {
RunBackup(
ctx context.Context,
path string,
realSource string,
tags map[string]string,
forceFull bool,
parentSnapshot string,

View File

@ -0,0 +1,98 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provider
import (
"context"
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/internal/credentials/mocks"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
)
type NewUploaderProviderTestCase struct {
Description string
UploaderType string
RequestorType string
ExpectedError string
needFromFile bool
}
func TestNewUploaderProvider(t *testing.T) {
// Mock objects or dependencies
ctx := context.Background()
client := fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()
repoIdentifier := "repoIdentifier"
bsl := &velerov1api.BackupStorageLocation{}
backupRepo := &velerov1api.BackupRepository{}
credGetter := &credentials.CredentialGetter{}
repoKeySelector := &v1.SecretKeySelector{}
log := logrus.New()
testCases := []NewUploaderProviderTestCase{
{
Description: "When requestorType is empty, it should return an error",
UploaderType: "kopia",
RequestorType: "",
ExpectedError: "requestor type is empty",
},
{
Description: "When FileStore credential is uninitialized, it should return an error",
UploaderType: "kopia",
RequestorType: "requestor",
ExpectedError: "uninitialized FileStore credentail",
},
{
Description: "When uploaderType is kopia, it should return a KopiaUploaderProvider",
UploaderType: "kopia",
RequestorType: "requestor",
needFromFile: true,
ExpectedError: "invalid credentials interface",
},
{
Description: "When uploaderType is not kopia, it should return a ResticUploaderProvider",
UploaderType: "restic",
RequestorType: "requestor",
needFromFile: true,
ExpectedError: "",
},
}
for _, testCase := range testCases {
t.Run(testCase.Description, func(t *testing.T) {
if testCase.needFromFile {
mockFileGetter := &mocks.FileStore{}
mockFileGetter.On("Path", &v1.SecretKeySelector{}).Return("", nil)
credGetter.FromFile = mockFileGetter
}
_, err := NewUploaderProvider(ctx, client, testCase.UploaderType, testCase.RequestorType, repoIdentifier, bsl, backupRepo, credGetter, repoKeySelector, log)
if testCase.ExpectedError == "" {
assert.Nil(t, err)
} else {
assert.Contains(t, err.Error(), testCase.ExpectedError)
}
})
}
}

View File

@ -33,9 +33,14 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
// ResticBackupCMDFunc and ResticRestoreCMDFunc are mainly used to make testing more convenient
var ResticBackupCMDFunc = restic.BackupCommand
var ResticRestoreCMDFunc = restic.RestoreCommand
// resticBackupCMDFunc and resticRestoreCMDFunc are mainly used to make testing more convenient
var resticBackupCMDFunc = restic.BackupCommand
var resticBackupFunc = restic.RunBackup
var resticGetSnapshotFunc = restic.GetSnapshotCommand
var resticGetSnapshotIDFunc = restic.GetSnapshotID
var resticRestoreCMDFunc = restic.RestoreCommand
var resticTempCACertFileFunc = restic.TempCACertFile
var resticCmdEnvFunc = restic.CmdEnv
type resticProvider struct {
repoIdentifier string
@ -68,13 +73,13 @@ func NewResticUploaderProvider(
// if there's a caCert on the ObjectStorage, write it to disk so that it can be passed to restic
if bsl.Spec.ObjectStorage != nil && bsl.Spec.ObjectStorage.CACert != nil {
provider.caCertFile, err = restic.TempCACertFile(bsl.Spec.ObjectStorage.CACert, bsl.Name, filesystem.NewFileSystem())
provider.caCertFile, err = resticTempCACertFileFunc(bsl.Spec.ObjectStorage.CACert, bsl.Name, filesystem.NewFileSystem())
if err != nil {
return nil, errors.Wrap(err, "error create temp cert file")
}
}
provider.cmdEnv, err = restic.CmdEnv(bsl, credGetter.FromFile)
provider.cmdEnv, err = resticCmdEnvFunc(bsl, credGetter.FromFile)
if err != nil {
return nil, errors.Wrap(err, "error generating repository cmnd env")
}
@ -112,6 +117,7 @@ func (rp *resticProvider) Close(ctx context.Context) error {
func (rp *resticProvider) RunBackup(
ctx context.Context,
path string,
realSource string,
tags map[string]string,
forceFull bool,
parentSnapshot string,
@ -120,12 +126,20 @@ func (rp *resticProvider) RunBackup(
return "", false, errors.New("Need to initial backup progress updater first")
}
if path == "" {
return "", false, errors.New("path is empty")
}
if realSource != "" {
return "", false, errors.New("real source is not empty, this is not supported by restic uploader")
}
log := rp.log.WithFields(logrus.Fields{
"path": path,
"parentSnapshot": parentSnapshot,
})
backupCmd := ResticBackupCMDFunc(rp.repoIdentifier, rp.credentialsFile, path, tags)
backupCmd := resticBackupCMDFunc(rp.repoIdentifier, rp.credentialsFile, path, tags)
backupCmd.Env = rp.cmdEnv
backupCmd.CACertFile = rp.caCertFile
if len(rp.extraFlags) != 0 {
@ -136,7 +150,7 @@ func (rp *resticProvider) RunBackup(
backupCmd.ExtraFlags = append(backupCmd.ExtraFlags, fmt.Sprintf("--parent=%s", parentSnapshot))
}
summary, stderrBuf, err := restic.RunBackup(backupCmd, log, updater)
summary, stderrBuf, err := resticBackupFunc(backupCmd, log, updater)
if err != nil {
if strings.Contains(stderrBuf, "snapshot is empty") {
log.Debugf("Restic backup got empty dir with %s path", path)
@ -145,13 +159,13 @@ func (rp *resticProvider) RunBackup(
return "", false, errors.WithStack(fmt.Errorf("error running restic backup command %s with error: %v stderr: %v", backupCmd.String(), err, stderrBuf))
}
// GetSnapshotID
snapshotIDCmd := restic.GetSnapshotCommand(rp.repoIdentifier, rp.credentialsFile, tags)
snapshotIDCmd := resticGetSnapshotFunc(rp.repoIdentifier, rp.credentialsFile, tags)
snapshotIDCmd.Env = rp.cmdEnv
snapshotIDCmd.CACertFile = rp.caCertFile
if len(rp.extraFlags) != 0 {
snapshotIDCmd.ExtraFlags = append(snapshotIDCmd.ExtraFlags, rp.extraFlags...)
}
snapshotID, err := restic.GetSnapshotID(snapshotIDCmd)
snapshotID, err := resticGetSnapshotIDFunc(snapshotIDCmd)
if err != nil {
return "", false, errors.WithStack(fmt.Errorf("error getting snapshot id with error: %v", err))
}
@ -174,7 +188,7 @@ func (rp *resticProvider) RunRestore(
"volumePath": volumePath,
})
restoreCmd := ResticRestoreCMDFunc(rp.repoIdentifier, rp.credentialsFile, snapshotID, volumePath)
restoreCmd := resticRestoreCMDFunc(rp.repoIdentifier, rp.credentialsFile, snapshotID, volumePath)
restoreCmd.Env = rp.cmdEnv
restoreCmd.CACertFile = rp.caCertFile
if len(rp.extraFlags) != 0 {

View File

@ -18,89 +18,369 @@ package provider
import (
"context"
"errors"
"io/ioutil"
"os"
"strings"
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
func TestResticRunBackup(t *testing.T) {
var rp resticProvider
rp.log = logrus.New()
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()}
testCases := []struct {
name string
hookBackupFunc func(repoIdentifier string, passwordFile string, path string, tags map[string]string) *restic.Command
hookRunBackupFunc func(backupCmd *restic.Command, log logrus.FieldLogger, updater uploader.ProgressUpdater) (string, string, error)
errorHandleFunc func(err error) bool
name string
nilUpdater bool
parentSnapshot string
rp *resticProvider
hookBackupFunc func(string, string, string, map[string]string) *restic.Command
hookResticBackupFunc func(*restic.Command, logrus.FieldLogger, uploader.ProgressUpdater) (string, string, error)
hookResticGetSnapshotFunc func(string, string, map[string]string) *restic.Command
hookResticGetSnapshotIDFunc func(*restic.Command) (string, error)
errorHandleFunc func(err error) bool
}{
{
name: "wrong restic execute command",
name: "nil uploader",
rp: &resticProvider{log: logrus.New()},
nilUpdater: true,
hookBackupFunc: func(repoIdentifier string, passwordFile string, path string, tags map[string]string) *restic.Command {
return &restic.Command{Command: "date"}
},
errorHandleFunc: func(err error) bool {
return strings.Contains(err.Error(), "executable file not found in")
return strings.Contains(err.Error(), "Need to initial backup progress updater first")
},
},
{
name: "wrong parsing json summary content",
name: "wrong restic execute command",
rp: &resticProvider{log: logrus.New()},
hookBackupFunc: func(repoIdentifier string, passwordFile string, path string, tags map[string]string) *restic.Command {
return &restic.Command{Command: "version"}
return &restic.Command{Command: "date"}
},
errorHandleFunc: func(err error) bool {
return strings.Contains(err.Error(), "executable file not found in")
return strings.Contains(err.Error(), "error running")
},
}, {
name: "has parent snapshot",
rp: &resticProvider{log: logrus.New()},
parentSnapshot: "parentSnapshot",
hookBackupFunc: func(repoIdentifier string, passwordFile string, path string, tags map[string]string) *restic.Command {
return &restic.Command{Command: "date"}
},
hookResticBackupFunc: func(*restic.Command, logrus.FieldLogger, uploader.ProgressUpdater) (string, string, error) {
return "", "", nil
},
hookResticGetSnapshotIDFunc: func(*restic.Command) (string, error) { return "test-snapshot-id", nil },
errorHandleFunc: func(err error) bool {
return err == nil
},
},
{
name: "has extra flags",
rp: &resticProvider{log: logrus.New(), extraFlags: []string{"testFlags"}},
hookBackupFunc: func(string, string, string, map[string]string) *restic.Command {
return &restic.Command{Command: "date"}
},
hookResticBackupFunc: func(*restic.Command, logrus.FieldLogger, uploader.ProgressUpdater) (string, string, error) {
return "", "", nil
},
hookResticGetSnapshotIDFunc: func(*restic.Command) (string, error) { return "test-snapshot-id", nil },
errorHandleFunc: func(err error) bool {
return err == nil
},
},
{
name: "failed to get snapshot id",
rp: &resticProvider{log: logrus.New(), extraFlags: []string{"testFlags"}},
hookBackupFunc: func(string, string, string, map[string]string) *restic.Command {
return &restic.Command{Command: "date"}
},
hookResticBackupFunc: func(*restic.Command, logrus.FieldLogger, uploader.ProgressUpdater) (string, string, error) {
return "", "", nil
},
hookResticGetSnapshotIDFunc: func(*restic.Command) (string, error) {
return "test-snapshot-id", errors.New("failed to get snapshot id")
},
errorHandleFunc: func(err error) bool {
return strings.Contains(err.Error(), "failed to get snapshot id")
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ResticBackupCMDFunc = tc.hookBackupFunc
_, _, err := rp.RunBackup(context.Background(), "var", nil, false, "", &updater)
rp.log.Infof("test name %v error %v", tc.name, err)
var err error
parentSnapshot := tc.parentSnapshot
if tc.hookBackupFunc != nil {
resticBackupCMDFunc = tc.hookBackupFunc
}
if tc.hookResticBackupFunc != nil {
resticBackupFunc = tc.hookResticBackupFunc
}
if tc.hookResticGetSnapshotFunc != nil {
resticGetSnapshotFunc = tc.hookResticGetSnapshotFunc
}
if tc.hookResticGetSnapshotIDFunc != nil {
resticGetSnapshotIDFunc = tc.hookResticGetSnapshotIDFunc
}
if !tc.nilUpdater {
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()}
_, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, &updater)
} else {
_, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, nil)
}
tc.rp.log.Infof("test name %v error %v", tc.name, err)
require.Equal(t, true, tc.errorHandleFunc(err))
})
}
}
func TestResticRunRestore(t *testing.T) {
var rp resticProvider
rp.log = logrus.New()
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()}
ResticRestoreCMDFunc = func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command {
resticRestoreCMDFunc = func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command {
return &restic.Command{Args: []string{""}}
}
testCases := []struct {
name string
rp *resticProvider
nilUpdater bool
hookResticRestoreFunc func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command
errorHandleFunc func(err error) bool
}{
{
name: "wrong restic execute command",
name: "wrong restic execute command",
rp: &resticProvider{log: logrus.New()},
nilUpdater: true,
errorHandleFunc: func(err error) bool {
return strings.Contains(err.Error(), "Need to initial backup progress updater first")
},
},
{
name: "has extral flags",
rp: &resticProvider{log: logrus.New(), extraFlags: []string{"test-extra-flags"}},
hookResticRestoreFunc: func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command {
return &restic.Command{Args: []string{"date"}}
},
errorHandleFunc: func(err error) bool {
return strings.Contains(err.Error(), "executable file not found ")
return strings.Contains(err.Error(), "error running command")
},
},
{
name: "wrong restic execute command",
rp: &resticProvider{log: logrus.New()},
hookResticRestoreFunc: func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command {
return &restic.Command{Args: []string{"date"}}
},
errorHandleFunc: func(err error) bool {
return strings.Contains(err.Error(), "error running command")
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ResticRestoreCMDFunc = tc.hookResticRestoreFunc
err := rp.RunRestore(context.Background(), "", "var", &updater)
rp.log.Infof("test name %v error %v", tc.name, err)
resticRestoreCMDFunc = tc.hookResticRestoreFunc
var err error
if !tc.nilUpdater {
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()}
err = tc.rp.RunRestore(context.Background(), "", "var", &updater)
} else {
err = tc.rp.RunRestore(context.Background(), "", "var", nil)
}
tc.rp.log.Infof("test name %v error %v", tc.name, err)
require.Equal(t, true, tc.errorHandleFunc(err))
})
}
}
func TestClose(t *testing.T) {
t.Run("Delete existing credentials file", func(t *testing.T) {
// Create temporary files for the credentials and caCert
credentialsFile, err := ioutil.TempFile("", "credentialsFile")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(credentialsFile.Name())
caCertFile, err := ioutil.TempFile("", "caCertFile")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(caCertFile.Name())
rp := &resticProvider{
credentialsFile: credentialsFile.Name(),
caCertFile: caCertFile.Name(),
}
// Test deleting an existing credentials file
err = rp.Close(context.Background())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
_, err = os.Stat(rp.credentialsFile)
if !os.IsNotExist(err) {
t.Errorf("expected credentials file to be deleted, got error: %v", err)
}
})
t.Run("Delete existing caCert file", func(t *testing.T) {
// Create temporary files for the credentials and caCert
caCertFile, err := ioutil.TempFile("", "caCertFile")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(caCertFile.Name())
rp := &resticProvider{
credentialsFile: "",
caCertFile: "",
}
err = rp.Close(context.Background())
// Test deleting an existing caCert file
if err != nil {
t.Errorf("unexpected error: %v", err)
}
_, err = os.Stat(rp.caCertFile)
if !os.IsNotExist(err) {
t.Errorf("expected caCert file to be deleted, got error: %v", err)
}
})
}
type MockCredentialGetter struct {
mock.Mock
}
func (m *MockCredentialGetter) Path(selector *v1.SecretKeySelector) (string, error) {
args := m.Called(selector)
return args.Get(0).(string), args.Error(1)
}
func TestNewResticUploaderProvider(t *testing.T) {
testCases := []struct {
name string
emptyBSL bool
mockCredFunc func(*MockCredentialGetter, *v1.SecretKeySelector)
resticCmdEnvFunc func(backupLocation *velerov1api.BackupStorageLocation, credentialFileStore credentials.FileStore) ([]string, error)
resticTempCACertFileFunc func(caCert []byte, bsl string, fs filesystem.Interface) (string, error)
checkFunc func(provider Provider, err error)
}{
{
name: "No error in creating temp credentials file",
mockCredFunc: func(credGetter *MockCredentialGetter, repoKeySelector *v1.SecretKeySelector) {
credGetter.On("Path", repoKeySelector).Return("temp-credentials", nil)
},
checkFunc: func(provider Provider, err error) {
assert.NoError(t, err)
assert.NotNil(t, provider)
},
}, {
name: "Error in creating temp credentials file",
mockCredFunc: func(credGetter *MockCredentialGetter, repoKeySelector *v1.SecretKeySelector) {
credGetter.On("Path", repoKeySelector).Return("", errors.New("error creating temp credentials file"))
},
checkFunc: func(provider Provider, err error) {
assert.Error(t, err)
assert.Nil(t, provider)
},
}, {
name: "ObjectStorage with CACert present and creating CACert file failed",
mockCredFunc: func(credGetter *MockCredentialGetter, repoKeySelector *v1.SecretKeySelector) {
credGetter.On("Path", repoKeySelector).Return("temp-credentials", nil)
},
resticTempCACertFileFunc: func(caCert []byte, bsl string, fs filesystem.Interface) (string, error) {
return "", errors.New("error writing CACert file")
},
checkFunc: func(provider Provider, err error) {
assert.Error(t, err)
assert.Nil(t, provider)
},
}, {
name: "Generating repository cmd failed",
mockCredFunc: func(credGetter *MockCredentialGetter, repoKeySelector *v1.SecretKeySelector) {
credGetter.On("Path", repoKeySelector).Return("temp-credentials", nil)
},
resticTempCACertFileFunc: func(caCert []byte, bsl string, fs filesystem.Interface) (string, error) {
return "test-ca", nil
},
resticCmdEnvFunc: func(backupLocation *velerov1api.BackupStorageLocation, credentialFileStore credentials.FileStore) ([]string, error) {
return nil, errors.New("error generating repository cmnd env")
},
checkFunc: func(provider Provider, err error) {
assert.Error(t, err)
assert.Nil(t, provider)
},
}, {
name: "New provider with not nil bsl",
mockCredFunc: func(credGetter *MockCredentialGetter, repoKeySelector *v1.SecretKeySelector) {
credGetter.On("Path", repoKeySelector).Return("temp-credentials", nil)
},
resticTempCACertFileFunc: func(caCert []byte, bsl string, fs filesystem.Interface) (string, error) {
return "test-ca", nil
},
resticCmdEnvFunc: func(backupLocation *velerov1api.BackupStorageLocation, credentialFileStore credentials.FileStore) ([]string, error) {
return nil, nil
},
checkFunc: func(provider Provider, err error) {
assert.NoError(t, err)
assert.NotNil(t, provider)
},
},
{
name: "New provider with nil bsl",
emptyBSL: true,
mockCredFunc: func(credGetter *MockCredentialGetter, repoKeySelector *v1.SecretKeySelector) {
credGetter.On("Path", repoKeySelector).Return("temp-credentials", nil)
},
resticTempCACertFileFunc: func(caCert []byte, bsl string, fs filesystem.Interface) (string, error) {
return "test-ca", nil
},
resticCmdEnvFunc: func(backupLocation *velerov1api.BackupStorageLocation, credentialFileStore credentials.FileStore) ([]string, error) {
return nil, nil
},
checkFunc: func(provider Provider, err error) {
assert.NoError(t, err)
assert.NotNil(t, provider)
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
repoIdentifier := "my-repo"
bsl := &velerov1api.BackupStorageLocation{}
if !tc.emptyBSL {
bsl = builder.ForBackupStorageLocation("test-ns", "test-name").CACert([]byte("my-cert")).Result()
}
credGetter := &credentials.CredentialGetter{}
repoKeySelector := &v1.SecretKeySelector{}
log := logrus.New()
// Mock CredentialGetter
mockCredGetter := &MockCredentialGetter{}
credGetter.FromFile = mockCredGetter
tc.mockCredFunc(mockCredGetter, repoKeySelector)
if tc.resticCmdEnvFunc != nil {
resticCmdEnvFunc = tc.resticCmdEnvFunc
}
if tc.resticTempCACertFileFunc != nil {
resticTempCACertFileFunc = tc.resticTempCACertFileFunc
}
tc.checkFunc(NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log))
})
}
}

View File

@ -17,12 +17,14 @@ package kube
import (
"context"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
)
@ -81,3 +83,30 @@ func DeletePodIfAny(ctx context.Context, podGetter corev1client.CoreV1Interface,
}
}
}
// EnsureDeletePod asserts the existence of a pod by name, deletes it and waits for its disappearance and returns errors on any failure
func EnsureDeletePod(ctx context.Context, podGetter corev1client.CoreV1Interface, pod string, namespace string, timeout time.Duration) error {
err := podGetter.Pods(namespace).Delete(ctx, pod, metav1.DeleteOptions{})
if err != nil {
return errors.Wrapf(err, "error to delete pod %s", pod)
}
err = wait.PollImmediate(waitInternal, timeout, func() (bool, error) {
_, err := podGetter.Pods(namespace).Get(ctx, pod, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
}
return false, errors.Wrapf(err, "error to get pod %s", pod)
}
return false, nil
})
if err != nil {
return errors.Wrapf(err, "error to assure pod is deleted, %s", pod)
}
return nil
}

93
pkg/util/kube/pod_test.go Normal file
View File

@ -0,0 +1,93 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube
import (
"context"
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
corev1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
clientTesting "k8s.io/client-go/testing"
)
func TestEnsureDeletePod(t *testing.T) {
podObject := &corev1api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "fake-pod",
},
}
tests := []struct {
name string
clientObj []runtime.Object
podName string
namespace string
reactors []reactor
err string
}{
{
name: "delete fail",
podName: "fake-pod",
namespace: "fake-ns",
err: "error to delete pod fake-pod: pods \"fake-pod\" not found",
},
{
name: "wait fail",
podName: "fake-pod",
namespace: "fake-ns",
clientObj: []runtime.Object{podObject},
reactors: []reactor{
{
verb: "get",
resource: "pods",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-get-error")
},
},
},
err: "error to assure pod is deleted, fake-pod: error to get pod fake-pod: fake-get-error",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.clientObj...)
for _, reactor := range test.reactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
}
var kubeClient kubernetes.Interface = fakeKubeClient
err := EnsureDeletePod(context.Background(), kubeClient.CoreV1(), test.podName, test.namespace, time.Millisecond)
if err != nil {
assert.EqualError(t, err, test.err)
} else {
assert.NoError(t, err)
}
})
}
}

View File

@ -18,17 +18,23 @@ package kube
import (
"context"
"encoding/json"
"fmt"
"time"
jsonpatch "github.com/evanphx/json-patch"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
storagev1api "k8s.io/api/storage/v1"
storagev1 "k8s.io/client-go/kubernetes/typed/storage/v1"
)
const (
@ -77,3 +83,222 @@ func WaitPVCBound(ctx context.Context, pvcGetter corev1client.CoreV1Interface,
return pv, err
}
// DeletePVIfAny deletes a PV by name if it exists, and log an error when the deletion fails
func DeletePVIfAny(ctx context.Context, pvGetter corev1client.CoreV1Interface, pvName string, log logrus.FieldLogger) {
err := pvGetter.PersistentVolumes().Delete(ctx, pvName, metav1.DeleteOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
log.WithError(err).Debugf("Abort deleting PV, it doesn't exist, %s", pvName)
} else {
log.WithError(err).Errorf("Failed to delete PV %s", pvName)
}
}
}
// EnsureDeletePVC asserts the existence of a PVC by name, deletes it and waits for its disappearance and returns errors on any failure
func EnsureDeletePVC(ctx context.Context, pvcGetter corev1client.CoreV1Interface, pvc string, namespace string, timeout time.Duration) error {
err := pvcGetter.PersistentVolumeClaims(namespace).Delete(ctx, pvc, metav1.DeleteOptions{})
if err != nil {
return errors.Wrapf(err, "error to delete pvc %s", pvc)
}
err = wait.PollImmediate(waitInternal, timeout, func() (bool, error) {
_, err := pvcGetter.PersistentVolumeClaims(namespace).Get(ctx, pvc, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
}
return false, errors.Wrapf(err, "error to get pvc %s", pvc)
}
return false, nil
})
if err != nil {
return errors.Wrapf(err, "error to retrieve pvc info for %s", pvc)
}
return nil
}
// RebindPVC rebinds a PVC by modifying its VolumeName to the specific PV
func RebindPVC(ctx context.Context, pvcGetter corev1client.CoreV1Interface,
pvc *corev1api.PersistentVolumeClaim, pv string) (*corev1api.PersistentVolumeClaim, error) {
origBytes, err := json.Marshal(pvc)
if err != nil {
return nil, errors.Wrap(err, "error marshaling original PVC")
}
updated := pvc.DeepCopy()
updated.Spec.VolumeName = pv
delete(updated.Annotations, KubeAnnBindCompleted)
delete(updated.Annotations, KubeAnnBoundByController)
updatedBytes, err := json.Marshal(updated)
if err != nil {
return nil, errors.Wrap(err, "error marshaling updated PV")
}
patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for PV")
}
updated, err = pvcGetter.PersistentVolumeClaims(pvc.Namespace).Patch(ctx, pvc.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return nil, errors.Wrap(err, "error patching PVC")
}
return updated, nil
}
// ResetPVBinding resets the binding info of a PV and adds the required labels so as to make it ready for binding
func ResetPVBinding(ctx context.Context, pvGetter corev1client.CoreV1Interface, pv *corev1api.PersistentVolume, labels map[string]string) (*corev1api.PersistentVolume, error) {
origBytes, err := json.Marshal(pv)
if err != nil {
return nil, errors.Wrap(err, "error marshaling original PV")
}
updated := pv.DeepCopy()
updated.Spec.ClaimRef = nil
delete(updated.Annotations, KubeAnnBoundByController)
if labels != nil {
if updated.Labels == nil {
updated.Labels = make(map[string]string)
}
for k, v := range labels {
if _, ok := updated.Labels[k]; !ok {
updated.Labels[k] = v
}
}
}
updatedBytes, err := json.Marshal(updated)
if err != nil {
return nil, errors.Wrap(err, "error marshaling updated PV")
}
patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for PV")
}
updated, err = pvGetter.PersistentVolumes().Patch(ctx, pv.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return nil, errors.Wrap(err, "error patching PV")
}
return updated, nil
}
// SetPVReclaimPolicy sets the specified reclaim policy to a PV
func SetPVReclaimPolicy(ctx context.Context, pvGetter corev1client.CoreV1Interface, pv *corev1api.PersistentVolume,
policy corev1api.PersistentVolumeReclaimPolicy) (*corev1api.PersistentVolume, error) {
if pv.Spec.PersistentVolumeReclaimPolicy == policy {
return nil, nil
}
origBytes, err := json.Marshal(pv)
if err != nil {
return nil, errors.Wrap(err, "error marshaling original PV")
}
updated := pv.DeepCopy()
updated.Spec.PersistentVolumeReclaimPolicy = policy
updatedBytes, err := json.Marshal(updated)
if err != nil {
return nil, errors.Wrap(err, "error marshaling updated PV")
}
patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for PV")
}
updated, err = pvGetter.PersistentVolumes().Patch(ctx, pv.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return nil, errors.Wrap(err, "error patching PV")
}
return updated, nil
}
// WaitPVCConsumed waits for a PVC to be consumed by a pod so that the selected node is set by the pod scheduling; or does
// nothing if the consuming doesn't affect the PV provision.
// The latest PVC and the selected node will be returned.
func WaitPVCConsumed(ctx context.Context, pvcGetter corev1client.CoreV1Interface, pvc string, namespace string,
storageClient storagev1.StorageV1Interface, timeout time.Duration) (string, *corev1api.PersistentVolumeClaim, error) {
selectedNode := ""
var updated *corev1api.PersistentVolumeClaim
var storageClass *storagev1api.StorageClass
err := wait.PollImmediate(waitInternal, timeout, func() (bool, error) {
tmpPVC, err := pvcGetter.PersistentVolumeClaims(namespace).Get(ctx, pvc, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, "error to get pvc %s/%s", namespace, pvc)
}
if tmpPVC.Spec.StorageClassName != nil && storageClass == nil {
storageClass, err = storageClient.StorageClasses().Get(ctx, *tmpPVC.Spec.StorageClassName, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, "error to get storage class %s", *tmpPVC.Spec.StorageClassName)
}
}
if storageClass != nil {
if storageClass.VolumeBindingMode != nil && *storageClass.VolumeBindingMode == storagev1api.VolumeBindingWaitForFirstConsumer {
selectedNode = tmpPVC.Annotations[KubeAnnSelectedNode]
if selectedNode == "" {
return false, nil
}
}
}
updated = tmpPVC
return true, nil
})
if err != nil {
return "", nil, errors.Wrap(err, "error to wait for PVC")
}
return selectedNode, updated, err
}
// WaitPVBound wait for binding of a PV specified by name and returns the bound PV object
func WaitPVBound(ctx context.Context, pvGetter corev1client.CoreV1Interface, pvName string, pvcName string, pvcNamespace string, timeout time.Duration) (*corev1api.PersistentVolume, error) {
var updated *corev1api.PersistentVolume
err := wait.PollImmediate(waitInternal, timeout, func() (bool, error) {
tmpPV, err := pvGetter.PersistentVolumes().Get(ctx, pvName, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, fmt.Sprintf("failed to get pv %s", pvName))
}
if tmpPV.Spec.ClaimRef == nil {
return false, nil
}
if tmpPV.Spec.ClaimRef.Name != pvcName {
return false, nil
}
if tmpPV.Spec.ClaimRef.Namespace != pvcNamespace {
return false, nil
}
updated = tmpPV
return true, nil
})
if err != nil {
return nil, errors.Wrap(err, "error to wait for bound of PV")
} else {
return updated, nil
}
}

View File

@ -29,6 +29,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
corev1api "k8s.io/api/core/v1"
storagev1api "k8s.io/api/storage/v1"
clientTesting "k8s.io/client-go/testing"
@ -133,6 +134,160 @@ func TestWaitPVCBound(t *testing.T) {
}
}
func TestWaitPVCConsumed(t *testing.T) {
storageClass := "fake-storage-class"
bindModeImmediate := storagev1api.VolumeBindingImmediate
bindModeWait := storagev1api.VolumeBindingWaitForFirstConsumer
pvcObject := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-namespace",
Name: "fake-pvc-1",
},
}
pvcObjectWithSC := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-namespace",
Name: "fake-pvc-2",
},
Spec: corev1api.PersistentVolumeClaimSpec{
StorageClassName: &storageClass,
},
}
scObjWithoutBindMode := &storagev1api.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-storage-class",
},
}
scObjWaitBind := &storagev1api.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-storage-class",
},
VolumeBindingMode: &bindModeWait,
}
scObjWithImmidateBinding := &storagev1api.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-storage-class",
},
VolumeBindingMode: &bindModeImmediate,
}
pvcObjectWithSCAndAnno := &corev1api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-namespace",
Name: "fake-pvc-3",
Annotations: map[string]string{"volume.kubernetes.io/selected-node": "fake-node-1"},
},
Spec: corev1api.PersistentVolumeClaimSpec{
StorageClassName: &storageClass,
},
}
tests := []struct {
name string
pvcName string
pvcNamespace string
kubeClientObj []runtime.Object
kubeReactors []reactor
expectedPVC *corev1api.PersistentVolumeClaim
selectedNode string
err string
}{
{
name: "get pvc error",
pvcName: "fake-pvc",
pvcNamespace: "fake-namespace",
err: "error to wait for PVC: error to get pvc fake-namespace/fake-pvc: persistentvolumeclaims \"fake-pvc\" not found",
},
{
name: "success when no sc",
pvcName: "fake-pvc-1",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
pvcObject,
},
expectedPVC: pvcObject,
},
{
name: "get sc fail",
pvcName: "fake-pvc-2",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
pvcObjectWithSC,
},
err: "error to wait for PVC: error to get storage class fake-storage-class: storageclasses.storage.k8s.io \"fake-storage-class\" not found",
},
{
name: "success on sc without binding mode",
pvcName: "fake-pvc-2",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
pvcObjectWithSC,
scObjWithoutBindMode,
},
expectedPVC: pvcObjectWithSC,
},
{
name: "success on sc without immediate binding mode",
pvcName: "fake-pvc-2",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
pvcObjectWithSC,
scObjWithImmidateBinding,
},
expectedPVC: pvcObjectWithSC,
},
{
name: "pvc annotation miss",
pvcName: "fake-pvc-2",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
pvcObjectWithSC,
scObjWaitBind,
},
err: "error to wait for PVC: timed out waiting for the condition",
},
{
name: "success on sc without wait binding mode",
pvcName: "fake-pvc-3",
pvcNamespace: "fake-namespace",
kubeClientObj: []runtime.Object{
pvcObjectWithSCAndAnno,
scObjWaitBind,
},
expectedPVC: pvcObjectWithSCAndAnno,
selectedNode: "fake-node-1",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
for _, reactor := range test.kubeReactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
}
var kubeClient kubernetes.Interface = fakeKubeClient
selectedNode, pvc, err := WaitPVCConsumed(context.Background(), kubeClient.CoreV1(), test.pvcName, test.pvcNamespace, kubeClient.StorageV1(), time.Millisecond)
if err != nil {
assert.EqualError(t, err, test.err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, test.expectedPVC, pvc)
assert.Equal(t, test.selectedNode, selectedNode)
})
}
}
func TestDeletePVCIfAny(t *testing.T) {
tests := []struct {
name string

View File

@ -47,6 +47,7 @@ const (
KubeAnnBoundByController = "pv.kubernetes.io/bound-by-controller"
KubeAnnDynamicallyProvisioned = "pv.kubernetes.io/provisioned-by"
KubeAnnMigratedTo = "pv.kubernetes.io/migrated-to"
KubeAnnSelectedNode = "volume.kubernetes.io/selected-node"
)
// NamespaceAndName returns a string in the format <namespace>/<name>

View File

@ -521,7 +521,7 @@ refer to [restic integration](#how-velero-integrates-with-restic) and [kopia int
Velero's FSB supports two data movement paths, the restic path and the kopia path. Velero allows users to select
between the two paths:
- For backup, the path is specified at the installation time through the `uploader-type` flag, the valid value is
either `restic` or `kopia`, or default to `restic` if the value is not specified. The selection is not allowed to be
either `restic` or `kopia`, or default to `kopia` if the value is not specified. The selection is not allowed to be
changed after the installation.
- For restore, the path is decided by the path used to back up the data, it is automatically selected. For example,
if you've created a backup with restic path, then you reinstall Velero with `uploader-type=kopia`, when you create

View File

@ -35,6 +35,11 @@ import (
func CreateNamespace(ctx context.Context, client TestClient, namespace string) error {
ns := builder.ForNamespace(namespace).Result()
// Add label to avoid PSA check.
ns.Labels = map[string]string{
"pod-security.kubernetes.io/enforce": "baseline",
"pod-security.kubernetes.io/enforce-version": "latest",
}
_, err := client.ClientGo.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
if apierrors.IsAlreadyExists(err) {
return nil
@ -45,6 +50,9 @@ func CreateNamespace(ctx context.Context, client TestClient, namespace string) e
func CreateNamespaceWithLabel(ctx context.Context, client TestClient, namespace string, label map[string]string) error {
ns := builder.ForNamespace(namespace).Result()
ns.Labels = label
// Add label to avoid PSA check.
ns.Labels["pod-security.kubernetes.io/enforce"] = "baseline"
ns.Labels["pod-security.kubernetes.io/enforce-version"] = "latest"
_, err := client.ClientGo.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
if apierrors.IsAlreadyExists(err) {
return nil
@ -54,6 +62,11 @@ func CreateNamespaceWithLabel(ctx context.Context, client TestClient, namespace
func CreateNamespaceWithAnnotation(ctx context.Context, client TestClient, namespace string, annotation map[string]string) error {
ns := builder.ForNamespace(namespace).Result()
// Add label to avoid PSA check.
ns.Labels = map[string]string{
"pod-security.kubernetes.io/enforce": "baseline",
"pod-security.kubernetes.io/enforce-version": "latest",
}
ns.ObjectMeta.Annotations = annotation
_, err := client.ClientGo.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
if apierrors.IsAlreadyExists(err) {

View File

@ -210,6 +210,13 @@ func installKibishii(ctx context.Context, namespace string, cloudPlatform, veler
return errors.Wrapf(err, "failed to install kibishii, stderr=%s", stderr)
}
labelNamespaceCmd := exec.CommandContext(ctx, "kubectl", "label", "namespace", namespace, "pod-security.kubernetes.io/enforce=baseline", "pod-security.kubernetes.io/enforce-version=latest", "--overwrite=true")
_, stderr, err = veleroexec.RunCommand(labelNamespaceCmd)
fmt.Printf("Label namespace with PSA policy: %s\n", labelNamespaceCmd)
if err != nil {
return errors.Wrapf(err, "failed to label namespace with PSA policy, stderr=%s", stderr)
}
kibishiiSetWaitCmd := exec.CommandContext(ctx, "kubectl", "rollout", "status", "statefulset.apps/kibishii-deployment",
"-n", namespace, "-w", "--timeout=30m")
_, stderr, err = veleroexec.RunCommand(kibishiiSetWaitCmd)