Remove redundant client from restore controller. (#5759)

Signed-off-by: Xun Jiang <blackpiglet@gmail.com>
pull/5828/head
Xun Jiang/Bruce Jiang 2023-02-03 15:57:43 +08:00 committed by GitHub
parent 745d573dfa
commit 0b6b841f2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 180 additions and 267 deletions

View File

@ -0,0 +1 @@
Remove restore controller's redundant client.

View File

@ -674,7 +674,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
restoreControllerRunInfo := func() controllerRunInfo {
restorer, err := restore.NewKubernetesRestorer(
s.veleroClient.VeleroV1(),
s.discoveryHelper,
client.NewDynamicFactory(s.dynamicClient),
s.config.restoreResourcePriorities,
@ -687,18 +686,15 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),
s.kubeClient.CoreV1().RESTClient(),
s.credentialFileStore,
s.mgr.GetClient(),
)
cmd.CheckError(err)
restoreController := controller.NewRestoreController(
s.namespace,
s.sharedInformerFactory.Velero().V1().Restores(),
s.veleroClient.VeleroV1(),
s.veleroClient.VeleroV1(),
restorer,
s.sharedInformerFactory.Velero().V1().Backups().Lister(),
s.mgr.GetClient(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations().Lister(),
s.logger,
s.logLevel,
newPluginManager,

View File

@ -28,7 +28,6 @@ import (
"sort"
"time"
jsonpatch "github.com/evanphx/json-patch"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -41,9 +40,7 @@ import (
hook "github.com/vmware-tanzu/velero/internal/hook"
api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/persistence"
@ -91,18 +88,13 @@ var nonRestorableResources = []string{
type restoreController struct {
*genericController
namespace string
restoreClient velerov1client.RestoresGetter
podVolumeBackupClient velerov1client.PodVolumeBackupsGetter
restorer pkgrestore.Restorer
backupLister velerov1listers.BackupLister
restoreLister velerov1listers.RestoreLister
kbClient client.Client
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister
restoreLogLevel logrus.Level
metrics *metrics.ServerMetrics
logFormat logging.Format
clock clock.Clock
namespace string
restorer pkgrestore.Restorer
kbClient client.Client
restoreLogLevel logrus.Level
metrics *metrics.ServerMetrics
logFormat logging.Format
clock clock.Clock
newPluginManager func(logger logrus.FieldLogger) clientmgmt.Manager
backupStoreGetter persistence.ObjectBackupStoreGetter
@ -111,12 +103,8 @@ type restoreController struct {
func NewRestoreController(
namespace string,
restoreInformer velerov1informers.RestoreInformer,
restoreClient velerov1client.RestoresGetter,
podVolumeBackupClient velerov1client.PodVolumeBackupsGetter,
restorer pkgrestore.Restorer,
backupLister velerov1listers.BackupLister,
kbClient client.Client,
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister,
logger logrus.FieldLogger,
restoreLogLevel logrus.Level,
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
@ -125,19 +113,14 @@ func NewRestoreController(
logFormat logging.Format,
) Interface {
c := &restoreController{
genericController: newGenericController(Restore, logger),
namespace: namespace,
restoreClient: restoreClient,
podVolumeBackupClient: podVolumeBackupClient,
restorer: restorer,
backupLister: backupLister,
restoreLister: restoreInformer.Lister(),
kbClient: kbClient,
snapshotLocationLister: snapshotLocationLister,
restoreLogLevel: restoreLogLevel,
metrics: metrics,
logFormat: logFormat,
clock: &clock.RealClock{},
genericController: newGenericController(Restore, logger),
namespace: namespace,
restorer: restorer,
kbClient: kbClient,
restoreLogLevel: restoreLogLevel,
metrics: metrics,
logFormat: logFormat,
clock: &clock.RealClock{},
// use variables to refer to these functions so they can be
// replaced with fakes for testing.
@ -149,6 +132,7 @@ func NewRestoreController(
c.resyncFunc = c.resync
c.resyncPeriod = time.Minute
// restore informer cannot be removed, until restore controller adopt the controller-runtime framework.
restoreInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@ -179,11 +163,12 @@ func NewRestoreController(
}
func (c *restoreController) resync() {
restores, err := c.restoreLister.List(labels.Everything())
restoreList := &velerov1api.RestoreList{}
err := c.kbClient.List(context.Background(), restoreList, &client.ListOptions{})
if err != nil {
c.logger.Error(err, "Error computing restore_total metric")
} else {
c.metrics.SetRestoreTotal(int64(len(restores)))
c.metrics.SetRestoreTotal(int64(len(restoreList.Items)))
}
}
@ -199,7 +184,11 @@ func (c *restoreController) processQueueItem(key string) error {
}
log.Debug("Getting Restore")
restore, err := c.restoreLister.Restores(ns).Get(name)
restore := &velerov1api.Restore{}
err = c.kbClient.Get(context.Background(), client.ObjectKey{
Namespace: ns,
Name: name,
}, restore)
if err != nil {
return errors.Wrap(err, "error getting Restore")
}
@ -253,15 +242,14 @@ func (c *restoreController) processRestore(restore *api.Restore) error {
}
// patch to update status and persist to API
updatedRestore, err := patchRestore(original, restore, c.restoreClient)
err := kubeutil.PatchResource(original, restore, c.kbClient)
if err != nil {
// return the error so the restore can be re-processed; it's currently
// still in phase = New.
return errors.Wrapf(err, "error updating Restore phase to %s", restore.Status.Phase)
}
// store ref to just-updated item for creating patch
original = updatedRestore
restore = updatedRestore.DeepCopy()
original = restore.DeepCopy()
if restore.Status.Phase == api.RestorePhaseFailedValidation {
return nil
@ -284,7 +272,7 @@ func (c *restoreController) processRestore(restore *api.Restore) error {
restore.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()}
c.logger.Debug("Updating restore's final status")
if _, err = patchRestore(original, restore, c.restoreClient); err != nil {
if err = kubeutil.PatchResource(original, restore, c.kbClient); err != nil {
c.logger.WithError(errors.WithStack(err)).Info("Error updating restore's final status")
}
@ -360,16 +348,19 @@ func (c *restoreController) validateAndComplete(restore *api.Restore, pluginMana
velerov1api.ScheduleNameLabel: restore.Spec.ScheduleName,
}))
backups, err := c.backupLister.Backups(c.namespace).List(selector)
backupList := &velerov1api.BackupList{}
c.kbClient.List(context.Background(), backupList, &client.ListOptions{
LabelSelector: selector,
})
if err != nil {
restore.Status.ValidationErrors = append(restore.Status.ValidationErrors, "Unable to list backups for schedule")
return backupInfo{}
}
if len(backups) == 0 {
if len(backupList.Items) == 0 {
restore.Status.ValidationErrors = append(restore.Status.ValidationErrors, "No backups found for schedule")
}
if backup := mostRecentCompletedBackup(backups); backup != nil {
if backup := mostRecentCompletedBackup(backupList.Items); backup.Name != "" {
restore.Spec.BackupName = backup.Name
} else {
restore.Status.ValidationErrors = append(restore.Status.ValidationErrors, "No completed backups found for schedule")
@ -407,7 +398,7 @@ func backupXorScheduleProvided(restore *api.Restore) bool {
// mostRecentCompletedBackup returns the most recent backup that's
// completed from a list of backups.
func mostRecentCompletedBackup(backups []*api.Backup) *api.Backup {
func mostRecentCompletedBackup(backups []api.Backup) api.Backup {
sort.Slice(backups, func(i, j int) bool {
// Use .After() because we want descending sort.
@ -427,13 +418,14 @@ func mostRecentCompletedBackup(backups []*api.Backup) *api.Backup {
}
}
return nil
return api.Backup{}
}
// fetchBackupInfo checks the backup lister for a backup that matches the given name. If it doesn't
// find it, it returns an error.
func (c *restoreController) fetchBackupInfo(backupName string, pluginManager clientmgmt.Manager) (backupInfo, error) {
backup, err := c.backupLister.Backups(c.namespace).Get(backupName)
backup := &velerov1api.Backup{}
err := c.kbClient.Get(context.Background(), types.NamespacedName{Namespace: c.namespace, Name: backupName}, backup)
if err != nil {
return backupInfo{}, err
}
@ -492,10 +484,16 @@ func (c *restoreController) runValidatedRestore(restore *api.Restore, info backu
}
defer closeAndRemoveFile(backupFile, c.logger)
opts := label.NewListOptionsForBackup(restore.Spec.BackupName)
listOpts := &client.ListOptions{
LabelSelector: labels.Set(map[string]string{
velerov1api.BackupNameLabel: label.GetValidName(restore.Spec.BackupName),
}).AsSelector(),
}
podVolumeBackupList, err := c.podVolumeBackupClient.PodVolumeBackups(c.namespace).List(context.TODO(), opts)
podVolumeBackupList := &velerov1api.PodVolumeBackupList{}
err = c.kbClient.List(context.TODO(), podVolumeBackupList, listOpts)
if err != nil {
restoreLog.Errorf("Fail to list PodVolumeBackup :%s", err.Error())
return errors.WithStack(err)
}
@ -519,7 +517,7 @@ func (c *restoreController) runValidatedRestore(restore *api.Restore, info backu
BackupReader: backupFile,
}
restoreWarnings, restoreErrors := c.restorer.RestoreWithResolvers(restoreReq, actionsResolver, snapshotItemResolver,
c.snapshotLocationLister, pluginManager)
pluginManager)
// log errors and warnings to the restore log
for _, msg := range restoreErrors.Velero {
@ -641,30 +639,6 @@ func downloadToTempFile(backupName string, backupStore persistence.BackupStore,
return file, nil
}
func patchRestore(original, updated *api.Restore, client velerov1client.RestoresGetter) (*api.Restore, error) {
origBytes, err := json.Marshal(original)
if err != nil {
return nil, errors.Wrap(err, "error marshalling original restore")
}
updatedBytes, err := json.Marshal(updated)
if err != nil {
return nil, errors.Wrap(err, "error marshalling updated restore")
}
patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for restore")
}
res, err := client.Restores(original.Namespace).Patch(context.TODO(), original.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return nil, errors.Wrap(err, "error patching restore")
}
return res, nil
}
type restoreLogger struct {
logrus.FieldLogger
file *os.File

View File

@ -19,7 +19,6 @@ package controller
import (
"bytes"
"context"
"encoding/json"
"io/ioutil"
"testing"
"time"
@ -30,17 +29,14 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
clocktesting "k8s.io/utils/clock/testing"
"sigs.k8s.io/controller-runtime/pkg/client"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake"
informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions"
listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/metrics"
persistencemocks "github.com/vmware-tanzu/velero/pkg/persistence/mocks"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
@ -96,7 +92,7 @@ func TestFetchBackupInfo(t *testing.T) {
var (
client = fake.NewSimpleClientset()
fakeClient = velerotest.NewFakeControllerRuntimeClient(t)
restorer = &fakeRestorer{}
restorer = &fakeRestorer{kbClient: fakeClient}
sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = velerotest.NewLogger()
pluginManager = &pluginmocks.Manager{}
@ -109,12 +105,8 @@ func TestFetchBackupInfo(t *testing.T) {
c := NewRestoreController(
velerov1api.DefaultNamespace,
sharedInformers.Velero().V1().Restores(),
client.VeleroV1(),
client.VeleroV1(),
restorer,
sharedInformers.Velero().V1().Backups().Lister(),
fakeClient,
sharedInformers.Velero().V1().VolumeSnapshotLocations().Lister(),
logger,
logrus.InfoLevel,
func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
@ -125,11 +117,11 @@ func TestFetchBackupInfo(t *testing.T) {
if test.backupStoreError == nil {
for _, itm := range test.informerLocations {
require.NoError(t, fakeClient.Create(context.Background(), itm))
require.NoError(t, c.kbClient.Create(context.Background(), itm))
}
for _, itm := range test.informerBackups {
sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(itm)
assert.NoError(t, c.kbClient.Create(context.Background(), itm))
}
}
@ -149,7 +141,9 @@ func TestFetchBackupInfo(t *testing.T) {
info, err := c.fetchBackupInfo(test.backupName, pluginManager)
require.Equal(t, test.expectedErr, err != nil)
assert.Equal(t, test.expectedRes, info.backup)
if test.expectedRes != nil {
assert.Equal(t, test.expectedRes.Spec, info.backup.Spec)
}
})
}
}
@ -193,7 +187,8 @@ func TestProcessQueueItemSkips(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
var (
client = fake.NewSimpleClientset()
restorer = &fakeRestorer{}
fakeClient = velerotest.NewFakeControllerRuntimeClient(t)
restorer = &fakeRestorer{kbClient: fakeClient}
sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = velerotest.NewLogger()
)
@ -201,12 +196,8 @@ func TestProcessQueueItemSkips(t *testing.T) {
c := NewRestoreController(
velerov1api.DefaultNamespace,
sharedInformers.Velero().V1().Restores(),
client.VeleroV1(),
client.VeleroV1(),
restorer,
sharedInformers.Velero().V1().Backups().Lister(),
nil,
sharedInformers.Velero().V1().VolumeSnapshotLocations().Lister(),
fakeClient,
logger,
logrus.InfoLevel,
nil,
@ -216,7 +207,7 @@ func TestProcessQueueItemSkips(t *testing.T) {
).(*restoreController)
if test.restore != nil {
sharedInformers.Velero().V1().Restores().Informer().GetStore().Add(test.restore)
c.kbClient.Create(context.Background(), test.restore)
}
err := c.processQueueItem(test.restoreKey)
@ -419,8 +410,8 @@ func TestProcessQueueItem(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
var (
client = fake.NewSimpleClientset()
fakeClient = velerotest.NewFakeControllerRuntimeClient(t)
restorer = &fakeRestorer{}
fakeClient = velerotest.NewFakeControllerRuntimeClientBuilder(t).Build()
restorer = &fakeRestorer{kbClient: fakeClient}
sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = velerotest.NewLogger()
pluginManager = &pluginmocks.Manager{}
@ -437,12 +428,8 @@ func TestProcessQueueItem(t *testing.T) {
c := NewRestoreController(
velerov1api.DefaultNamespace,
sharedInformers.Velero().V1().Restores(),
client.VeleroV1(),
client.VeleroV1(),
restorer,
sharedInformers.Velero().V1().Backups().Lister(),
fakeClient,
sharedInformers.Velero().V1().VolumeSnapshotLocations().Lister(),
logger,
logrus.InfoLevel,
func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
@ -451,59 +438,16 @@ func TestProcessQueueItem(t *testing.T) {
formatFlag,
).(*restoreController)
c.clock = clock.NewFakeClock(now)
c.clock = clocktesting.NewFakeClock(now)
if test.location != nil {
require.NoError(t, fakeClient.Create(context.Background(), test.location))
}
if test.backup != nil {
sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(test.backup)
assert.NoError(t, c.kbClient.Create(context.Background(), test.backup))
}
if test.restore != nil {
sharedInformers.Velero().V1().Restores().Informer().GetStore().Add(test.restore)
// this is necessary so the Patch() call returns the appropriate object
client.PrependReactor("patch", "restores", func(action core.Action) (bool, runtime.Object, error) {
if test.restore == nil {
return true, nil, nil
}
patch := action.(core.PatchAction).GetPatch()
patchMap := make(map[string]interface{})
if err := json.Unmarshal(patch, &patchMap); err != nil {
t.Logf("error unmarshalling patch: %s\n", err)
return false, nil, err
}
phase, found, err := unstructured.NestedString(patchMap, "status", "phase")
if err != nil {
t.Logf("error getting status.phase: %s\n", err)
return false, nil, err
}
if !found {
t.Logf("status.phase not found")
return false, nil, errors.New("status.phase not found")
}
res := test.restore.DeepCopy()
// these are the fields that we expect to be set by
// the controller
res.Status.Phase = velerov1api.RestorePhase(phase)
backupName, found, err := unstructured.NestedString(patchMap, "spec", "backupName")
if found {
res.Spec.BackupName = backupName
}
return true, res, nil
})
}
if test.backup != nil {
sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(test.backup)
require.NoError(t, c.kbClient.Create(context.Background(), test.restore))
}
var warnings, errors pkgrestore.Result
@ -564,14 +508,12 @@ func TestProcessQueueItem(t *testing.T) {
err = c.processQueueItem(key)
assert.Equal(t, test.expectedErr, err != nil, "got error %v", err)
actions := client.Actions()
if test.expectedPhase == "" {
require.Equal(t, 0, len(actions), "len(actions) should be zero")
return
}
// structs and func for decoding patch content
// struct and func for decoding patch content
type SpecPatch struct {
BackupName string `json:"backupName"`
}
@ -589,16 +531,6 @@ func TestProcessQueueItem(t *testing.T) {
Status StatusPatch `json:"status"`
}
decode := func(decoder *json.Decoder) (interface{}, error) {
actual := new(Patch)
err := decoder.Decode(actual)
return *actual, err
}
// validate Patch call 1 (setting phase, validation errs)
require.True(t, len(actions) > 0, "len(actions) is too small")
expected := Patch{
Status: StatusPatch{
Phase: velerov1api.RestorePhase(test.expectedPhase),
@ -616,8 +548,6 @@ func TestProcessQueueItem(t *testing.T) {
expected.Status.StartTimestamp = test.expectedStartTime
}
velerotest.ValidatePatch(t, actions[0], expected, decode)
// if we don't expect a restore, validate it wasn't called and exit the test
if test.expectedRestorerCall == nil {
assert.Empty(t, restorer.Calls)
@ -646,17 +576,16 @@ func TestProcessQueueItem(t *testing.T) {
}
}
velerotest.ValidatePatch(t, actions[2], expected, decode)
// explicitly capturing the argument passed to Restore myself because
// I want to validate the called arg as of the time of calling, but
// the mock stores the pointer, which gets modified after
assert.Equal(t, *test.expectedRestorerCall, restorer.calledWithArg)
assert.Equal(t, test.expectedRestorerCall.Spec, restorer.calledWithArg.Spec)
assert.Equal(t, test.expectedRestorerCall.Status.Phase, restorer.calledWithArg.Status.Phase)
})
}
}
func TestvalidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
formatFlag := logging.FormatText
var (
@ -664,21 +593,19 @@ func TestvalidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = velerotest.NewLogger()
pluginManager = &pluginmocks.Manager{}
fakeClient = velerotest.NewFakeControllerRuntimeClient(t)
backupStore = &persistencemocks.BackupStore{}
)
c := NewRestoreController(
velerov1api.DefaultNamespace,
sharedInformers.Velero().V1().Restores(),
client.VeleroV1(),
client.VeleroV1(),
nil,
sharedInformers.Velero().V1().Backups().Lister(),
nil,
sharedInformers.Velero().V1().VolumeSnapshotLocations().Lister(),
fakeClient,
logger,
logrus.DebugLevel,
nil,
nil, // backupStoreGetter
func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
NewFakeSingleObjectBackupStoreGetter(backupStore),
nil,
formatFlag,
).(*restoreController)
@ -701,8 +628,8 @@ func TestvalidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
Result(),
))
errs := c.validateAndComplete(restore, pluginManager)
assert.Equal(t, []string{"No backups found for schedule"}, errs)
c.validateAndComplete(restore, pluginManager)
assert.Contains(t, restore.Status.ValidationErrors, "No backups found for schedule")
assert.Empty(t, restore.Spec.BackupName)
// no completed backups created from the schedule: fail validation
@ -716,37 +643,40 @@ func TestvalidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
Result(),
))
errs = c.validateAndComplete(restore, pluginManager)
assert.Equal(t, []string{"No completed backups found for schedule"}, errs)
c.validateAndComplete(restore, pluginManager)
assert.Contains(t, restore.Status.ValidationErrors, "No completed backups found for schedule")
assert.Empty(t, restore.Spec.BackupName)
// multiple completed backups created from the schedule: use most recent
now := time.Now()
require.NoError(t, sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(
require.NoError(t, c.kbClient.Create(context.Background(),
defaultBackup().
ObjectMeta(
builder.WithName("foo"),
builder.WithLabels(velerov1api.ScheduleNameLabel, "schedule-1"),
).
StorageLocation("default").
Phase(velerov1api.BackupPhaseCompleted).
StartTimestamp(now).
Result(),
))
require.NoError(t, sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(
defaultBackup().
ObjectMeta(
builder.WithName("foo"),
builder.WithLabels(velerov1api.ScheduleNameLabel, "schedule-1"),
).
Phase(velerov1api.BackupPhaseCompleted).
StartTimestamp(now.Add(time.Second)).
Result(),
))
errs = c.validateAndComplete(restore, pluginManager)
assert.Nil(t, errs)
assert.Equal(t, "bar", restore.Spec.BackupName)
location := builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Result()
require.NoError(t, c.kbClient.Create(context.Background(), location))
restore = &velerov1api.Restore{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1api.DefaultNamespace,
Name: "restore-1",
},
Spec: velerov1api.RestoreSpec{
ScheduleName: "schedule-1",
},
}
c.validateAndComplete(restore, pluginManager)
assert.Nil(t, restore.Status.ValidationErrors)
assert.Equal(t, "foo", restore.Spec.BackupName)
}
func TestBackupXorScheduleProvided(t *testing.T) {
@ -767,7 +697,7 @@ func TestBackupXorScheduleProvided(t *testing.T) {
}
func TestMostRecentCompletedBackup(t *testing.T) {
backups := []*velerov1api.Backup{
backups := []velerov1api.Backup{
{
ObjectMeta: metav1.ObjectMeta{
Name: "a",
@ -810,11 +740,11 @@ func TestMostRecentCompletedBackup(t *testing.T) {
},
}
assert.Nil(t, mostRecentCompletedBackup(backups))
assert.Empty(t, mostRecentCompletedBackup(backups).Name)
now := time.Now()
backups = append(backups, &velerov1api.Backup{
backups = append(backups, velerov1api.Backup{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
@ -824,7 +754,7 @@ func TestMostRecentCompletedBackup(t *testing.T) {
},
})
expected := &velerov1api.Backup{
expected := velerov1api.Backup{
ObjectMeta: metav1.ObjectMeta{
Name: "bar",
},
@ -857,12 +787,12 @@ func NewRestore(ns, name, backup, includeNS, includeResource string, phase veler
type fakeRestorer struct {
mock.Mock
calledWithArg velerov1api.Restore
kbClient client.Client
}
func (r *fakeRestorer) Restore(
info pkgrestore.Request,
actions []riav2.RestoreItemAction,
snapshotLocationLister listers.VolumeSnapshotLocationLister,
volumeSnapshotterGetter pkgrestore.VolumeSnapshotterGetter,
) (pkgrestore.Result, pkgrestore.Result) {
res := r.Called(info.Log, info.Restore, info.Backup, info.BackupReader, actions)
@ -875,11 +805,10 @@ func (r *fakeRestorer) Restore(
func (r *fakeRestorer) RestoreWithResolvers(req pkgrestore.Request,
resolver framework.RestoreItemActionResolverV2,
itemSnapshotterResolver framework.ItemSnapshotterResolver,
snapshotLocationLister listers.VolumeSnapshotLocationLister,
volumeSnapshotterGetter pkgrestore.VolumeSnapshotterGetter,
) (pkgrestore.Result, pkgrestore.Result) {
res := r.Called(req.Log, req.Restore, req.Backup, req.BackupReader, resolver, itemSnapshotterResolver,
snapshotLocationLister, volumeSnapshotterGetter)
r.kbClient, volumeSnapshotterGetter)
r.calledWithArg = *req.Restore

View File

@ -17,13 +17,16 @@ limitations under the License.
package restore
import (
"context"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/volume"
)
@ -39,7 +42,7 @@ type pvRestorer struct {
restorePVs *bool
volumeSnapshots []*volume.Snapshot
volumeSnapshotterGetter VolumeSnapshotterGetter
snapshotLocationLister listers.VolumeSnapshotLocationLister
kbclient client.Client
credentialFileStore credentials.FileStore
}
@ -61,7 +64,7 @@ func (r *pvRestorer) executePVAction(obj *unstructured.Unstructured) (*unstructu
log := r.logger.WithFields(logrus.Fields{"persistentVolume": pvName})
snapshotInfo, err := getSnapshotInfo(pvName, r.backup, r.volumeSnapshots, r.snapshotLocationLister, r.credentialFileStore, r.logger)
snapshotInfo, err := getSnapshotInfo(pvName, r.backup, r.volumeSnapshots, r.kbclient, r.credentialFileStore, r.logger)
if err != nil {
return nil, err
}
@ -105,7 +108,7 @@ type snapshotInfo struct {
location *api.VolumeSnapshotLocation
}
func getSnapshotInfo(pvName string, backup *api.Backup, volumeSnapshots []*volume.Snapshot, snapshotLocationLister listers.VolumeSnapshotLocationLister, credentialStore credentials.FileStore, logger logrus.FieldLogger) (*snapshotInfo, error) {
func getSnapshotInfo(pvName string, backup *api.Backup, volumeSnapshots []*volume.Snapshot, client client.Client, credentialStore credentials.FileStore, logger logrus.FieldLogger) (*snapshotInfo, error) {
var pvSnapshot *volume.Snapshot
for _, snapshot := range volumeSnapshots {
if snapshot.Spec.PersistentVolumeName == pvName {
@ -118,12 +121,18 @@ func getSnapshotInfo(pvName string, backup *api.Backup, volumeSnapshots []*volum
return nil, nil
}
loc, err := snapshotLocationLister.VolumeSnapshotLocations(backup.Namespace).Get(pvSnapshot.Spec.Location)
snapshotLocation := &api.VolumeSnapshotLocation{}
err := client.Get(
context.Background(),
types.NamespacedName{Namespace: backup.Namespace, Name: pvSnapshot.Spec.Location},
snapshotLocation,
)
if err != nil {
return nil, errors.WithStack(err)
}
// add credential to config
err = volume.UpdateVolumeSnapshotLocationWithCredentialConfig(loc, credentialStore, logger)
err = volume.UpdateVolumeSnapshotLocationWithCredentialConfig(snapshotLocation, credentialStore, logger)
if err != nil {
return nil, errors.WithStack(err)
}
@ -133,6 +142,6 @@ func getSnapshotInfo(pvName string, backup *api.Backup, volumeSnapshots []*volum
volumeType: pvSnapshot.Spec.VolumeType,
volumeAZ: pvSnapshot.Spec.VolumeAZ,
volumeIOPS: pvSnapshot.Spec.VolumeIOPS,
location: loc,
location: snapshotLocation,
}, nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package restore
import (
"context"
"testing"
"github.com/pkg/errors"
@ -121,9 +122,9 @@ func TestExecutePVAction_NoSnapshotRestores(t *testing.T) {
)
r := &pvRestorer{
logger: velerotest.NewLogger(),
restorePVs: tc.restore.Spec.RestorePVs,
snapshotLocationLister: snapshotLocationInformer.Lister(),
logger: velerotest.NewLogger(),
restorePVs: tc.restore.Spec.RestorePVs,
kbclient: velerotest.NewFakeControllerRuntimeClient(t),
}
if tc.backup != nil {
r.backup = tc.backup
@ -190,18 +191,18 @@ func TestExecutePVAction_SnapshotRestores(t *testing.T) {
volumeSnapshotterGetter = providerToVolumeSnapshotterMap(map[string]vsv1.VolumeSnapshotter{
tc.expectedProvider: volumeSnapshotter,
})
locationsInformer = informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0).Velero().V1().VolumeSnapshotLocations()
fakeClient = velerotest.NewFakeControllerRuntimeClientBuilder(t).Build()
)
for _, loc := range tc.locations {
require.NoError(t, locationsInformer.Informer().GetStore().Add(loc))
require.NoError(t, fakeClient.Create(context.Background(), loc))
}
r := &pvRestorer{
logger: velerotest.NewLogger(),
backup: tc.backup,
volumeSnapshots: tc.volumeSnapshots,
snapshotLocationLister: locationsInformer.Lister(),
kbclient: fakeClient,
volumeSnapshotterGetter: volumeSnapshotterGetter,
}

View File

@ -39,12 +39,12 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
crclient "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/internal/hook"
@ -53,8 +53,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
"github.com/vmware-tanzu/velero/pkg/features"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/plugin/framework"
@ -89,21 +87,18 @@ type Restorer interface {
// Restore restores the backup data from backupReader, returning warnings and errors.
Restore(req Request,
actions []riav2.RestoreItemAction,
snapshotLocationLister listers.VolumeSnapshotLocationLister,
volumeSnapshotterGetter VolumeSnapshotterGetter,
) (Result, Result)
RestoreWithResolvers(
req Request,
restoreItemActionResolver framework.RestoreItemActionResolverV2,
itemSnapshotterResolver framework.ItemSnapshotterResolver,
snapshotLocationLister listers.VolumeSnapshotLocationLister,
volumeSnapshotterGetter VolumeSnapshotterGetter,
) (Result, Result)
}
// kubernetesRestorer implements Restorer for restoring into a Kubernetes cluster.
type kubernetesRestorer struct {
restoreClient velerov1client.RestoresGetter
discoveryHelper discovery.Helper
dynamicFactory client.DynamicFactory
namespaceClient corev1.NamespaceInterface
@ -117,11 +112,11 @@ type kubernetesRestorer struct {
podCommandExecutor podexec.PodCommandExecutor
podGetter cache.Getter
credentialFileStore credentials.FileStore
kbClient crclient.Client
}
// NewKubernetesRestorer creates a new kubernetesRestorer.
func NewKubernetesRestorer(
restoreClient velerov1client.RestoresGetter,
discoveryHelper discovery.Helper,
dynamicFactory client.DynamicFactory,
resourcePriorities Priorities,
@ -133,9 +128,9 @@ func NewKubernetesRestorer(
podCommandExecutor podexec.PodCommandExecutor,
podGetter cache.Getter,
credentialStore credentials.FileStore,
kbClient crclient.Client,
) (Restorer, error) {
return &kubernetesRestorer{
restoreClient: restoreClient,
discoveryHelper: discoveryHelper,
dynamicFactory: dynamicFactory,
namespaceClient: namespaceClient,
@ -156,6 +151,7 @@ func NewKubernetesRestorer(
podCommandExecutor: podCommandExecutor,
podGetter: podGetter,
credentialFileStore: credentialStore,
kbClient: kbClient,
}, nil
}
@ -165,19 +161,17 @@ func NewKubernetesRestorer(
func (kr *kubernetesRestorer) Restore(
req Request,
actions []riav2.RestoreItemAction,
snapshotLocationLister listers.VolumeSnapshotLocationLister,
volumeSnapshotterGetter VolumeSnapshotterGetter,
) (Result, Result) {
resolver := framework.NewRestoreItemActionResolverV2(actions)
snapshotItemResolver := framework.NewItemSnapshotterResolver(nil)
return kr.RestoreWithResolvers(req, resolver, snapshotItemResolver, snapshotLocationLister, volumeSnapshotterGetter)
return kr.RestoreWithResolvers(req, resolver, snapshotItemResolver, volumeSnapshotterGetter)
}
func (kr *kubernetesRestorer) RestoreWithResolvers(
req Request,
restoreItemActionResolver framework.RestoreItemActionResolverV2,
itemSnapshotterResolver framework.ItemSnapshotterResolver,
snapshotLocationLister listers.VolumeSnapshotLocationLister,
volumeSnapshotterGetter VolumeSnapshotterGetter,
) (Result, Result) {
// metav1.LabelSelectorAsSelector converts a nil LabelSelector to a
@ -281,7 +275,7 @@ func (kr *kubernetesRestorer) RestoreWithResolvers(
restorePVs: req.Restore.Spec.RestorePVs,
volumeSnapshots: req.VolumeSnapshots,
volumeSnapshotterGetter: volumeSnapshotterGetter,
snapshotLocationLister: snapshotLocationLister,
kbclient: kr.kbClient,
credentialFileStore: kr.credentialFileStore,
}
@ -320,7 +314,7 @@ func (kr *kubernetesRestorer) RestoreWithResolvers(
waitExecHookHandler: waitExecHookHandler,
hooksContext: hooksCtx,
hooksCancelFunc: hooksCancelFunc,
restoreClient: kr.restoreClient,
kbClient: kr.kbClient,
}
return restoreCtx.execute()
@ -331,7 +325,6 @@ type restoreContext struct {
backupReader io.Reader
restore *velerov1api.Restore
restoreDir string
restoreClient velerov1client.RestoresGetter
resourceIncludesExcludes *collections.IncludesExcludes
resourceStatusIncludesExcludes *collections.IncludesExcludes
namespaceIncludesExcludes *collections.IncludesExcludes
@ -365,6 +358,7 @@ type restoreContext struct {
waitExecHookHandler hook.WaitExecHookHandler
hooksContext go_context.Context
hooksCancelFunc go_context.CancelFunc
kbClient crclient.Client
}
type resourceClientKey struct {
@ -459,18 +453,13 @@ func (ctx *restoreContext) execute() (Result, Result) {
lastUpdate = &val
case <-ticker.C:
if lastUpdate != nil {
patch := fmt.Sprintf(
`{"status":{"progress":{"totalItems":%d,"itemsRestored":%d}}}`,
lastUpdate.totalItems,
lastUpdate.itemsRestored,
)
_, err := ctx.restoreClient.Restores(ctx.restore.Namespace).Patch(
go_context.TODO(),
ctx.restore.Name,
types.MergePatchType,
[]byte(patch),
metav1.PatchOptions{},
)
updated := ctx.restore.DeepCopy()
if updated.Status.Progress == nil {
updated.Status.Progress = &velerov1api.RestoreProgress{}
}
updated.Status.Progress.TotalItems = lastUpdate.totalItems
updated.Status.Progress.ItemsRestored = lastUpdate.itemsRestored
err = kube.PatchResource(ctx.restore, updated, ctx.kbClient)
if err != nil {
ctx.log.WithError(errors.WithStack((err))).
Warn("Got error trying to update restore's status.progress")
@ -552,19 +541,14 @@ func (ctx *restoreContext) execute() (Result, Result) {
// Do a final progress update as stopping the ticker might have left last few
// updates from taking place.
patch := fmt.Sprintf(
`{"status":{"progress":{"totalItems":%d,"itemsRestored":%d}}}`,
len(ctx.restoredItems),
len(ctx.restoredItems),
)
updated := ctx.restore.DeepCopy()
if updated.Status.Progress == nil {
updated.Status.Progress = &velerov1api.RestoreProgress{}
}
updated.Status.Progress.TotalItems = len(ctx.restoredItems)
updated.Status.Progress.ItemsRestored = len(ctx.restoredItems)
_, err = ctx.restoreClient.Restores(ctx.restore.Namespace).Patch(
go_context.TODO(),
ctx.restore.Name,
types.MergePatchType,
[]byte(patch),
metav1.PatchOptions{},
)
err = kube.PatchResource(ctx.restore, updated, ctx.kbClient)
if err != nil {
ctx.log.WithError(errors.WithStack((err))).Warn("Updating restore status.progress")
}

View File

@ -45,7 +45,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
riav2 "github.com/vmware-tanzu/velero/pkg/plugin/velero/restoreitemaction/v2"
@ -54,6 +53,7 @@ import (
uploadermocks "github.com/vmware-tanzu/velero/pkg/podvolume/mocks"
"github.com/vmware-tanzu/velero/pkg/test"
testutil "github.com/vmware-tanzu/velero/pkg/test"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/util/kube"
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/volume"
@ -579,7 +579,6 @@ func TestRestoreResourceFiltering(t *testing.T) {
warnings, errs := h.restorer.Restore(
data,
nil, // restoreItemActions
nil, // snapshot location lister
nil, // volume snapshotter getter
)
@ -660,7 +659,6 @@ func TestRestoreNamespaceMapping(t *testing.T) {
warnings, errs := h.restorer.Restore(
data,
nil, // restoreItemActions
nil, // snapshot location lister
nil, // volume snapshotter getter
)
@ -745,7 +743,6 @@ func TestRestoreResourcePriorities(t *testing.T) {
warnings, errs := h.restorer.Restore(
data,
nil, // restoreItemActions
nil, // snapshot location lister
nil, // volume snapshotter getter
)
@ -823,7 +820,6 @@ func TestInvalidTarballContents(t *testing.T) {
warnings, errs := h.restorer.Restore(
data,
nil, // restoreItemActions
nil, // snapshot location lister
nil, // volume snapshotter getter
)
assertWantErrsOrWarnings(t, tc.wantWarnings, warnings)
@ -1128,7 +1124,6 @@ func TestRestoreItems(t *testing.T) {
warnings, errs := h.restorer.Restore(
data,
nil, // restoreItemActions
nil, // snapshot location lister
nil, // volume snapshotter getter
)
@ -1359,7 +1354,6 @@ func TestRestoreActionsRunForCorrectItems(t *testing.T) {
warnings, errs := h.restorer.Restore(
data,
actions,
nil, // snapshot location lister
nil, // volume snapshotter getter
)
@ -1535,7 +1529,6 @@ func TestRestoreActionModifications(t *testing.T) {
warnings, errs := h.restorer.Restore(
data,
tc.actions,
nil, // snapshot location lister
nil, // volume snapshotter getter
)
@ -1702,7 +1695,6 @@ func TestRestoreActionAdditionalItems(t *testing.T) {
warnings, errs := h.restorer.Restore(
data,
tc.actions,
nil, // snapshot location lister
nil, // volume snapshotter getter
)
@ -2666,10 +2658,9 @@ func TestRestorePersistentVolumes(t *testing.T) {
return renamed, nil
}
// set up the VolumeSnapshotLocation informer/lister and add test data to it
vslInformer := velerov1informers.NewSharedInformerFactory(h.VeleroClient, 0).Velero().V1().VolumeSnapshotLocations()
// set up the VolumeSnapshotLocation client and add test data to it
for _, vsl := range tc.volumeSnapshotLocations {
require.NoError(t, vslInformer.Informer().GetStore().Add(vsl))
require.NoError(t, h.restorer.kbClient.Create(context.Background(), vsl))
}
for _, r := range tc.apiResources {
@ -2697,7 +2688,6 @@ func TestRestorePersistentVolumes(t *testing.T) {
warnings, errs := h.restorer.Restore(
data,
nil, // restoreItemActions
vslInformer.Lister(),
tc.volumeSnapshotterGetter,
)
@ -2833,7 +2823,6 @@ func TestRestoreWithPodVolume(t *testing.T) {
warnings, errs := h.restorer.Restore(
data,
nil, // restoreItemActions
nil, // snapshot location lister
nil, // volume snapshotter getter
)
@ -3152,6 +3141,7 @@ func newHarness(t *testing.T) *harness {
apiServer := test.NewAPIServer(t)
log := logrus.StandardLogger()
kbClient := velerotest.NewFakeControllerRuntimeClient(t)
discoveryHelper, err := discovery.NewHelper(apiServer.DiscoveryClient, log)
require.NoError(t, err)
@ -3159,7 +3149,6 @@ func newHarness(t *testing.T) *harness {
return &harness{
APIServer: apiServer,
restorer: &kubernetesRestorer{
restoreClient: apiServer.VeleroClient.VeleroV1(),
discoveryHelper: discoveryHelper,
dynamicFactory: client.NewDynamicFactory(apiServer.DynamicClient),
namespaceClient: apiServer.KubeClient.CoreV1().Namespaces(),
@ -3170,6 +3159,7 @@ func newHarness(t *testing.T) *harness {
// unsupported
podVolumeRestorerFactory: nil,
podVolumeTimeout: 0,
kbClient: kbClient,
},
log: log,
}

29
pkg/util/kube/client.go Normal file
View File

@ -0,0 +1,29 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube
import (
"context"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func PatchResource(original, updated client.Object, kbClient client.Client) error {
err := kbClient.Patch(context.Background(), updated, client.MergeFrom(original))
return err
}