Added tracking for deleted namespace status check in restore flow (#8233)
* Added tracking for deleted namespace status check in restore flow Signed-off-by: sangitaray2021 <sangitaray@microsoft.com> fixed unittest Signed-off-by: sangitaray2021 <sangitaray@microsoft.com> refactored tracker execution and caller Signed-off-by: sangitaray2021 <sangitaray@microsoft.com> added change log Signed-off-by: sangitaray2021 <sangitaray@microsoft.com> Author: sangitaray2021 <sangitaray@microsft.com> Author: sangitaray2021 <sangitaray@microsoft.com> Date: Thu Sep 19 02:26:14 2024 +0530 Signed-off-by: sangitaray2021 <sangitaray@microsoft.com> * fixed linter issuer Signed-off-by: sangitaray2021 <sangitaray@microsoft.com> * incorporated PR comments Signed-off-by: sangitaray2021 <sangitaray@microsoft.com> * resolved comments Signed-off-by: sangitaray2021 <sangitaray@microsoft.com> --------- Signed-off-by: sangitaray2021 <sangitaray@microsoft.com>pull/8428/head
parent
bef994e67a
commit
74790d9f60
changelogs/unreleased
pkg
|
@ -0,0 +1 @@
|
|||
Added tracking for deleted namespace status check in restore flow.
|
|
@ -559,17 +559,18 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
|
|||
}
|
||||
|
||||
restoreReq := &pkgrestore.Request{
|
||||
Log: restoreLog,
|
||||
Restore: restore,
|
||||
Backup: info.backup,
|
||||
PodVolumeBackups: podVolumeBackups,
|
||||
VolumeSnapshots: volumeSnapshots,
|
||||
BackupReader: backupFile,
|
||||
ResourceModifiers: resourceModifiers,
|
||||
DisableInformerCache: r.disableInformerCache,
|
||||
CSIVolumeSnapshots: csiVolumeSnapshots,
|
||||
BackupVolumeInfoMap: backupVolumeInfoMap,
|
||||
RestoreVolumeInfoTracker: volume.NewRestoreVolInfoTracker(restore, restoreLog, r.globalCrClient),
|
||||
Log: restoreLog,
|
||||
Restore: restore,
|
||||
Backup: info.backup,
|
||||
PodVolumeBackups: podVolumeBackups,
|
||||
VolumeSnapshots: volumeSnapshots,
|
||||
BackupReader: backupFile,
|
||||
ResourceModifiers: resourceModifiers,
|
||||
DisableInformerCache: r.disableInformerCache,
|
||||
CSIVolumeSnapshots: csiVolumeSnapshots,
|
||||
BackupVolumeInfoMap: backupVolumeInfoMap,
|
||||
RestoreVolumeInfoTracker: volume.NewRestoreVolInfoTracker(restore, restoreLog, r.globalCrClient),
|
||||
ResourceDeletionStatusTracker: kubeutil.NewResourceDeletionStatusTracker(),
|
||||
}
|
||||
restoreWarnings, restoreErrors := r.restorer.RestoreWithResolvers(restoreReq, actionsResolver, pluginManager)
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ import (
|
|||
"github.com/vmware-tanzu/velero/internal/volume"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemoperation"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -52,18 +53,19 @@ func resourceKey(obj runtime.Object) string {
|
|||
type Request struct {
|
||||
*velerov1api.Restore
|
||||
|
||||
Log logrus.FieldLogger
|
||||
Backup *velerov1api.Backup
|
||||
PodVolumeBackups []*velerov1api.PodVolumeBackup
|
||||
VolumeSnapshots []*volume.Snapshot
|
||||
BackupReader io.Reader
|
||||
RestoredItems map[itemKey]restoredItemStatus
|
||||
itemOperationsList *[]*itemoperation.RestoreOperation
|
||||
ResourceModifiers *resourcemodifiers.ResourceModifiers
|
||||
DisableInformerCache bool
|
||||
CSIVolumeSnapshots []*snapshotv1api.VolumeSnapshot
|
||||
BackupVolumeInfoMap map[string]volume.BackupVolumeInfo
|
||||
RestoreVolumeInfoTracker *volume.RestoreVolumeInfoTracker
|
||||
Log logrus.FieldLogger
|
||||
Backup *velerov1api.Backup
|
||||
PodVolumeBackups []*velerov1api.PodVolumeBackup
|
||||
VolumeSnapshots []*volume.Snapshot
|
||||
BackupReader io.Reader
|
||||
RestoredItems map[itemKey]restoredItemStatus
|
||||
itemOperationsList *[]*itemoperation.RestoreOperation
|
||||
ResourceModifiers *resourcemodifiers.ResourceModifiers
|
||||
DisableInformerCache bool
|
||||
CSIVolumeSnapshots []*snapshotv1api.VolumeSnapshot
|
||||
BackupVolumeInfoMap map[string]volume.BackupVolumeInfo
|
||||
RestoreVolumeInfoTracker *volume.RestoreVolumeInfoTracker
|
||||
ResourceDeletionStatusTracker kube.ResourceDeletionStatusTracker
|
||||
}
|
||||
|
||||
type restoredItemStatus struct {
|
||||
|
|
|
@ -102,22 +102,23 @@ type Restorer interface {
|
|||
|
||||
// kubernetesRestorer implements Restorer for restoring into a Kubernetes cluster.
|
||||
type kubernetesRestorer struct {
|
||||
discoveryHelper discovery.Helper
|
||||
dynamicFactory client.DynamicFactory
|
||||
namespaceClient corev1.NamespaceInterface
|
||||
podVolumeRestorerFactory podvolume.RestorerFactory
|
||||
podVolumeTimeout time.Duration
|
||||
resourceTerminatingTimeout time.Duration
|
||||
resourceTimeout time.Duration
|
||||
resourcePriorities types.Priorities
|
||||
fileSystem filesystem.Interface
|
||||
pvRenamer func(string) (string, error)
|
||||
logger logrus.FieldLogger
|
||||
podCommandExecutor podexec.PodCommandExecutor
|
||||
podGetter cache.Getter
|
||||
credentialFileStore credentials.FileStore
|
||||
kbClient crclient.Client
|
||||
multiHookTracker *hook.MultiHookTracker
|
||||
discoveryHelper discovery.Helper
|
||||
dynamicFactory client.DynamicFactory
|
||||
namespaceClient corev1.NamespaceInterface
|
||||
podVolumeRestorerFactory podvolume.RestorerFactory
|
||||
podVolumeTimeout time.Duration
|
||||
resourceTerminatingTimeout time.Duration
|
||||
resourceTimeout time.Duration
|
||||
resourcePriorities types.Priorities
|
||||
fileSystem filesystem.Interface
|
||||
pvRenamer func(string) (string, error)
|
||||
logger logrus.FieldLogger
|
||||
podCommandExecutor podexec.PodCommandExecutor
|
||||
podGetter cache.Getter
|
||||
credentialFileStore credentials.FileStore
|
||||
kbClient crclient.Client
|
||||
multiHookTracker *hook.MultiHookTracker
|
||||
resourceDeletionStatusTracker kube.ResourceDeletionStatusTracker
|
||||
}
|
||||
|
||||
// NewKubernetesRestorer creates a new kubernetesRestorer.
|
||||
|
@ -323,6 +324,7 @@ func (kr *kubernetesRestorer) RestoreWithResolvers(
|
|||
backupVolumeInfoMap: req.BackupVolumeInfoMap,
|
||||
restoreVolumeInfoTracker: req.RestoreVolumeInfoTracker,
|
||||
hooksWaitExecutor: hooksWaitExecutor,
|
||||
resourceDeletionStatusTracker: req.ResourceDeletionStatusTracker,
|
||||
}
|
||||
|
||||
return restoreCtx.execute()
|
||||
|
@ -371,6 +373,7 @@ type restoreContext struct {
|
|||
backupVolumeInfoMap map[string]volume.BackupVolumeInfo
|
||||
restoreVolumeInfoTracker *volume.RestoreVolumeInfoTracker
|
||||
hooksWaitExecutor *hooksWaitExecutor
|
||||
resourceDeletionStatusTracker kube.ResourceDeletionStatusTracker
|
||||
}
|
||||
|
||||
type resourceClientKey struct {
|
||||
|
@ -718,6 +721,7 @@ func (ctx *restoreContext) processSelectedResource(
|
|||
ns,
|
||||
ctx.namespaceClient,
|
||||
ctx.resourceTerminatingTimeout,
|
||||
ctx.resourceDeletionStatusTracker,
|
||||
)
|
||||
if err != nil {
|
||||
errs.AddVeleroError(err)
|
||||
|
@ -1119,7 +1123,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
|
|||
// namespace into which the resource is being restored into exists.
|
||||
// This is the *remapped* namespace that we are ensuring exists.
|
||||
nsToEnsure := getNamespace(ctx.log, archive.GetItemFilePath(ctx.restoreDir, "namespaces", "", obj.GetNamespace()), namespace)
|
||||
_, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(nsToEnsure, ctx.namespaceClient, ctx.resourceTerminatingTimeout)
|
||||
_, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(nsToEnsure, ctx.namespaceClient, ctx.resourceTerminatingTimeout, ctx.resourceDeletionStatusTracker)
|
||||
if err != nil {
|
||||
errs.AddVeleroError(err)
|
||||
return warnings, errs, itemExists
|
||||
|
|
|
@ -58,6 +58,7 @@ import (
|
|||
uploadermocks "github.com/vmware-tanzu/velero/pkg/podvolume/mocks"
|
||||
"github.com/vmware-tanzu/velero/pkg/test"
|
||||
"github.com/vmware-tanzu/velero/pkg/types"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
. "github.com/vmware-tanzu/velero/pkg/util/results"
|
||||
)
|
||||
|
@ -2292,10 +2293,11 @@ func TestShouldRestore(t *testing.T) {
|
|||
h := newHarness(t)
|
||||
|
||||
ctx := &restoreContext{
|
||||
log: h.log,
|
||||
dynamicFactory: client.NewDynamicFactory(h.DynamicClient),
|
||||
namespaceClient: h.KubeClient.CoreV1().Namespaces(),
|
||||
resourceTerminatingTimeout: time.Millisecond,
|
||||
log: h.log,
|
||||
dynamicFactory: client.NewDynamicFactory(h.DynamicClient),
|
||||
namespaceClient: h.KubeClient.CoreV1().Namespaces(),
|
||||
resourceTerminatingTimeout: time.Millisecond,
|
||||
resourceDeletionStatusTracker: kube.NewResourceDeletionStatusTracker(),
|
||||
}
|
||||
|
||||
for _, resource := range tc.apiResources {
|
||||
|
@ -3711,9 +3713,10 @@ func newHarness(t *testing.T) *harness {
|
|||
fileSystem: test.NewFakeFileSystem(),
|
||||
|
||||
// unsupported
|
||||
podVolumeRestorerFactory: nil,
|
||||
podVolumeTimeout: 0,
|
||||
kbClient: kbClient,
|
||||
podVolumeRestorerFactory: nil,
|
||||
podVolumeTimeout: 0,
|
||||
kbClient: kbClient,
|
||||
resourceDeletionStatusTracker: kube.NewResourceDeletionStatusTracker(),
|
||||
},
|
||||
log: log,
|
||||
}
|
||||
|
@ -3900,9 +3903,10 @@ func TestIsAlreadyExistsError(t *testing.T) {
|
|||
h := newHarness(t)
|
||||
|
||||
ctx := &restoreContext{
|
||||
log: h.log,
|
||||
dynamicFactory: client.NewDynamicFactory(h.DynamicClient),
|
||||
namespaceClient: h.KubeClient.CoreV1().Namespaces(),
|
||||
log: h.log,
|
||||
dynamicFactory: client.NewDynamicFactory(h.DynamicClient),
|
||||
namespaceClient: h.KubeClient.CoreV1().Namespaces(),
|
||||
resourceDeletionStatusTracker: kube.NewResourceDeletionStatusTracker(),
|
||||
}
|
||||
|
||||
if test.apiResource != nil {
|
||||
|
@ -4019,7 +4023,8 @@ func TestHasCSIVolumeSnapshot(t *testing.T) {
|
|||
h := newHarness(t)
|
||||
|
||||
ctx := &restoreContext{
|
||||
log: h.log,
|
||||
log: h.log,
|
||||
resourceDeletionStatusTracker: kube.NewResourceDeletionStatusTracker(),
|
||||
}
|
||||
|
||||
if tc.vs != nil {
|
||||
|
@ -4119,9 +4124,10 @@ func TestHasSnapshotDataUpload(t *testing.T) {
|
|||
h := newHarness(t)
|
||||
|
||||
ctx := &restoreContext{
|
||||
log: h.log,
|
||||
kbClient: h.restorer.kbClient,
|
||||
restore: tc.restore,
|
||||
log: h.log,
|
||||
kbClient: h.restorer.kbClient,
|
||||
restore: tc.restore,
|
||||
resourceDeletionStatusTracker: kube.NewResourceDeletionStatusTracker(),
|
||||
}
|
||||
|
||||
if tc.duResult != nil {
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
Copyright 2018 the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kube
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
)
|
||||
|
||||
// resourceDeletionStatusTracker keeps track of items pending deletion.
|
||||
type ResourceDeletionStatusTracker interface {
|
||||
// Add informs the tracker that a polling is in progress to check namespace deletion status.
|
||||
Add(kind, ns, name string)
|
||||
// Delete informs the tracker that a namespace deletion is completed.
|
||||
Delete(kind, ns, name string)
|
||||
// Contains returns true if the tracker is tracking the namespace deletion progress.
|
||||
Contains(kind, ns, name string) bool
|
||||
}
|
||||
|
||||
type resourceDeletionStatusTracker struct {
|
||||
lock sync.RWMutex
|
||||
isNameSpacePresentInPollingSet sets.Set[string]
|
||||
}
|
||||
|
||||
// NewResourceDeletionStatusTracker returns a new ResourceDeletionStatusTracker.
|
||||
func NewResourceDeletionStatusTracker() ResourceDeletionStatusTracker {
|
||||
return &resourceDeletionStatusTracker{
|
||||
isNameSpacePresentInPollingSet: sets.New[string](),
|
||||
}
|
||||
}
|
||||
|
||||
func (bt *resourceDeletionStatusTracker) Add(kind, ns, name string) {
|
||||
bt.lock.Lock()
|
||||
defer bt.lock.Unlock()
|
||||
|
||||
bt.isNameSpacePresentInPollingSet.Insert(resourceDeletionStatusTrackerKey(kind, ns, name))
|
||||
}
|
||||
|
||||
func (bt *resourceDeletionStatusTracker) Delete(kind, ns, name string) {
|
||||
bt.lock.Lock()
|
||||
defer bt.lock.Unlock()
|
||||
|
||||
bt.isNameSpacePresentInPollingSet.Delete(resourceDeletionStatusTrackerKey(kind, ns, name))
|
||||
}
|
||||
|
||||
func (bt *resourceDeletionStatusTracker) Contains(kind, ns, name string) bool {
|
||||
bt.lock.RLock()
|
||||
defer bt.lock.RUnlock()
|
||||
|
||||
return bt.isNameSpacePresentInPollingSet.Has(resourceDeletionStatusTrackerKey(kind, ns, name))
|
||||
}
|
||||
|
||||
func resourceDeletionStatusTrackerKey(kind, ns, name string) string {
|
||||
return fmt.Sprintf("%s/%s/%s", kind, ns, name)
|
||||
}
|
|
@ -74,11 +74,14 @@ func NamespaceAndName(objMeta metav1.Object) string {
|
|||
//
|
||||
// namespace already exists and is not ready, this function will return (false, false, nil).
|
||||
// If the namespace exists and is marked for deletion, this function will wait up to the timeout for it to fully delete.
|
||||
func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client corev1client.NamespaceInterface, timeout time.Duration) (ready bool, nsCreated bool, err error) {
|
||||
func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client corev1client.NamespaceInterface, timeout time.Duration, resourceDeletionStatusTracker ResourceDeletionStatusTracker) (ready bool, nsCreated bool, err error) {
|
||||
// nsCreated tells whether the namespace was created by this method
|
||||
// required for keeping track of number of restored items
|
||||
// if namespace is marked for deletion, and we timed out, report an error
|
||||
var terminatingNamespace bool
|
||||
|
||||
var namespaceAlreadyInDeletionTracker bool
|
||||
|
||||
err = wait.PollUntilContextTimeout(context.Background(), time.Second, timeout, true, func(ctx context.Context) (bool, error) {
|
||||
clusterNS, err := client.Get(ctx, namespace.Name, metav1.GetOptions{})
|
||||
// if namespace is marked for deletion, and we timed out, report an error
|
||||
|
@ -92,8 +95,12 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core
|
|||
// Return the err and exit the loop.
|
||||
return true, err
|
||||
}
|
||||
|
||||
if clusterNS != nil && (clusterNS.GetDeletionTimestamp() != nil || clusterNS.Status.Phase == corev1api.NamespaceTerminating) {
|
||||
if resourceDeletionStatusTracker.Contains(clusterNS.Kind, clusterNS.Name, clusterNS.Name) {
|
||||
namespaceAlreadyInDeletionTracker = true
|
||||
return true, errors.Errorf("namespace %s is already present in the polling set, skipping execution", namespace.Name)
|
||||
}
|
||||
|
||||
// Marked for deletion, keep waiting
|
||||
terminatingNamespace = true
|
||||
return false, nil
|
||||
|
@ -107,7 +114,12 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core
|
|||
// err will be set if we timed out or encountered issues retrieving the namespace,
|
||||
if err != nil {
|
||||
if terminatingNamespace {
|
||||
// If the namespace is marked for deletion, and we timed out, adding it in tracker
|
||||
resourceDeletionStatusTracker.Add(namespace.Kind, namespace.Name, namespace.Name)
|
||||
return false, nsCreated, errors.Wrapf(err, "timed out waiting for terminating namespace %s to disappear before restoring", namespace.Name)
|
||||
} else if namespaceAlreadyInDeletionTracker {
|
||||
// If the namespace is already in the tracker, return an error.
|
||||
return false, nsCreated, errors.Wrapf(err, "skipping polling for terminating namespace %s", namespace.Name)
|
||||
}
|
||||
return false, nsCreated, errors.Wrapf(err, "error getting namespace %s", namespace.Name)
|
||||
}
|
||||
|
|
|
@ -47,14 +47,16 @@ func TestNamespaceAndName(t *testing.T) {
|
|||
|
||||
func TestEnsureNamespaceExistsAndIsReady(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
expectNSFound bool
|
||||
nsPhase corev1.NamespacePhase
|
||||
nsDeleting bool
|
||||
expectCreate bool
|
||||
alreadyExists bool
|
||||
expectedResult bool
|
||||
expectedCreatedResult bool
|
||||
name string
|
||||
expectNSFound bool
|
||||
nsPhase corev1.NamespacePhase
|
||||
nsDeleting bool
|
||||
expectCreate bool
|
||||
alreadyExists bool
|
||||
expectedResult bool
|
||||
expectedCreatedResult bool
|
||||
nsAlreadyInTerminationTracker bool
|
||||
ResourceDeletionStatusTracker ResourceDeletionStatusTracker
|
||||
}{
|
||||
{
|
||||
name: "namespace found, not deleting",
|
||||
|
@ -95,8 +97,17 @@ func TestEnsureNamespaceExistsAndIsReady(t *testing.T) {
|
|||
expectedResult: false,
|
||||
expectedCreatedResult: false,
|
||||
},
|
||||
{
|
||||
name: "same namespace found earlier, terminating phase already tracked",
|
||||
expectNSFound: true,
|
||||
nsPhase: corev1.NamespaceTerminating,
|
||||
expectedResult: false,
|
||||
expectedCreatedResult: false,
|
||||
nsAlreadyInTerminationTracker: true,
|
||||
},
|
||||
}
|
||||
|
||||
resourceDeletionStatusTracker := NewResourceDeletionStatusTracker()
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
namespace := &corev1.Namespace{
|
||||
|
@ -132,7 +143,11 @@ func TestEnsureNamespaceExistsAndIsReady(t *testing.T) {
|
|||
nsClient.On("Create", namespace).Return(namespace, nil)
|
||||
}
|
||||
|
||||
result, nsCreated, _ := EnsureNamespaceExistsAndIsReady(namespace, nsClient, timeout)
|
||||
if test.nsAlreadyInTerminationTracker {
|
||||
resourceDeletionStatusTracker.Add(namespace.Kind, "test", "test")
|
||||
}
|
||||
|
||||
result, nsCreated, _ := EnsureNamespaceExistsAndIsReady(namespace, nsClient, timeout, resourceDeletionStatusTracker)
|
||||
|
||||
assert.Equal(t, test.expectedResult, result)
|
||||
assert.Equal(t, test.expectedCreatedResult, nsCreated)
|
||||
|
|
Loading…
Reference in New Issue