Merge pull request #8366 from sseago/synchronise-backedupitems

Make BackedUpItems thread safe
pull/8486/head
Shubham Pampattiwar 2024-12-04 07:50:45 -08:00 committed by GitHub
commit 6c0ed1e5d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 154 additions and 44 deletions

View File

@ -0,0 +1 @@
Make BackedUpItems thread safe

View File

@ -0,0 +1,90 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package backup
import (
"fmt"
"sort"
"sync"
)
// backedUpItemsMap keeps track of the items already backed up for the current Velero Backup
type backedUpItemsMap struct {
*sync.RWMutex
backedUpItems map[itemKey]struct{}
}
func NewBackedUpItemsMap() *backedUpItemsMap {
return &backedUpItemsMap{
RWMutex: &sync.RWMutex{},
backedUpItems: make(map[itemKey]struct{}),
}
}
func (m *backedUpItemsMap) CopyItemMap() map[itemKey]struct{} {
m.RLock()
defer m.RUnlock()
returnMap := make(map[itemKey]struct{}, len(m.backedUpItems))
for key, val := range m.backedUpItems {
returnMap[key] = val
}
return returnMap
}
// ResourceMap returns a map of the backed up items.
// For each map entry, the key is the resource type,
// and the value is a list of namespaced names for the resource.
func (m *backedUpItemsMap) ResourceMap() map[string][]string {
m.RLock()
defer m.RUnlock()
resources := map[string][]string{}
for i := range m.backedUpItems {
entry := i.name
if i.namespace != "" {
entry = fmt.Sprintf("%s/%s", i.namespace, i.name)
}
resources[i.resource] = append(resources[i.resource], entry)
}
// sort namespace/name entries for each GVK
for _, v := range resources {
sort.Strings(v)
}
return resources
}
func (m *backedUpItemsMap) Len() int {
m.RLock()
defer m.RUnlock()
return len(m.backedUpItems)
}
func (m *backedUpItemsMap) Has(key itemKey) bool {
m.RLock()
defer m.RUnlock()
_, exists := m.backedUpItems[key]
return exists
}
func (m *backedUpItemsMap) AddItem(key itemKey) {
m.Lock()
defer m.Unlock()
m.backedUpItems[key] = struct{}{}
}

View File

@ -297,8 +297,6 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
return err
}
backupRequest.BackedUpItems = map[itemKey]struct{}{}
podVolumeTimeout := kb.podVolumeTimeout
if val := backupRequest.Annotations[velerov1api.PodVolumeOperationTimeoutAnnotation]; val != "" {
parsed, err := time.ParseDuration(val)
@ -499,12 +497,13 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
// updated total is computed as "how many items we've backed up so far, plus
// how many items we know of that are remaining"
totalItems := len(backupRequest.BackedUpItems) + (len(items) - (i + 1))
backedUpItems := backupRequest.BackedUpItems.Len()
totalItems := backedUpItems + (len(items) - (i + 1))
// send a progress update
update <- progressUpdate{
totalItems: totalItems,
itemsBackedUp: len(backupRequest.BackedUpItems),
itemsBackedUp: backedUpItems,
}
log.WithFields(map[string]interface{}{
@ -512,7 +511,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
"resource": items[i].groupResource.String(),
"namespace": items[i].namespace,
"name": items[i].name,
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", len(backupRequest.BackedUpItems), totalItems)
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", backedUpItems, totalItems)
}
// no more progress updates will be sent on the 'update' channel
@ -538,8 +537,9 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
if updated.Status.Progress == nil {
updated.Status.Progress = &velerov1api.BackupProgress{}
}
updated.Status.Progress.TotalItems = len(backupRequest.BackedUpItems)
updated.Status.Progress.ItemsBackedUp = len(backupRequest.BackedUpItems)
backedUpItems := backupRequest.BackedUpItems.Len()
updated.Status.Progress.TotalItems = backedUpItems
updated.Status.Progress.ItemsBackedUp = backedUpItems
// update the hooks execution status
if updated.Status.HookStatus == nil {
@ -558,8 +558,8 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
log.Infof("Summary for skipped PVs: %s", skippedPVSummary)
}
backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: len(backupRequest.BackedUpItems), ItemsBackedUp: len(backupRequest.BackedUpItems)}
log.WithField("progress", "").Infof("Backed up a total of %d items", len(backupRequest.BackedUpItems))
backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: backedUpItems, ItemsBackedUp: backedUpItems}
log.WithField("progress", "").Infof("Backed up a total of %d items", backedUpItems)
return nil
}
@ -667,7 +667,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
continue
}
// Don't run hooks if pod has already been backed up
if _, exists := itemBlock.itemBackupper.backupRequest.BackedUpItems[key]; !exists {
if !itemBlock.itemBackupper.backupRequest.BackedUpItems.Has(key) {
preHookPods = append(preHookPods, item)
}
}
@ -681,7 +681,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
itemBlock.Log.WithError(errors.WithStack(err)).Error("Error accessing pod metadata")
continue
}
itemBlock.itemBackupper.backupRequest.BackedUpItems[key] = struct{}{}
itemBlock.itemBackupper.backupRequest.BackedUpItems.AddItem(key)
}
itemBlock.Log.Debug("Backing up items in BackupItemBlock")
@ -861,8 +861,6 @@ func (kb *kubernetesBackupper) FinalizeBackup(
return err
}
backupRequest.BackedUpItems = map[itemKey]struct{}{}
// set up a temp dir for the itemCollector to use to temporarily
// store items as they're scraped from the API.
tempDir, err := os.MkdirTemp("", "")
@ -947,14 +945,15 @@ func (kb *kubernetesBackupper) FinalizeBackup(
// updated total is computed as "how many items we've backed up so far, plus
// how many items we know of that are remaining"
totalItems := len(backupRequest.BackedUpItems) + (len(items) - (i + 1))
backedUpItems := backupRequest.BackedUpItems.Len()
totalItems := backedUpItems + (len(items) - (i + 1))
log.WithFields(map[string]interface{}{
"progress": "",
"resource": item.groupResource.String(),
"namespace": item.namespace,
"name": item.name,
}).Infof("Updated %d items out of an estimated total of %d (estimate will change throughout the backup finalizer)", len(backupRequest.BackedUpItems), totalItems)
}).Infof("Updated %d items out of an estimated total of %d (estimate will change throughout the backup finalizer)", backedUpItems, totalItems)
}
volumeInfos, err := backupStore.GetBackupVolumeInfos(backupRequest.Backup.Name)
@ -979,7 +978,7 @@ func (kb *kubernetesBackupper) FinalizeBackup(
return err
}
log.WithField("progress", "").Infof("Updated a total of %d items", len(backupRequest.BackedUpItems))
log.WithField("progress", "").Infof("Updated a total of %d items", backupRequest.BackedUpItems.Len())
return nil
}

View File

@ -76,6 +76,7 @@ func TestBackedUpItemsMatchesTarballContents(t *testing.T) {
req := &Request{
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile := bytes.NewBuffer([]byte{})
@ -103,7 +104,7 @@ func TestBackedUpItemsMatchesTarballContents(t *testing.T) {
// go through BackedUpItems after the backup to assemble the list of files we
// expect to see in the tarball and compare to see if they match
var expectedFiles []string
for item := range req.BackedUpItems {
for item := range req.BackedUpItems.CopyItemMap() {
file := "resources/" + gvkToResource[item.resource]
if item.namespace != "" {
file = file + "/namespaces/" + item.namespace
@ -135,6 +136,7 @@ func TestBackupProgressIsUpdated(t *testing.T) {
req := &Request{
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile := bytes.NewBuffer([]byte{})
@ -159,8 +161,8 @@ func TestBackupProgressIsUpdated(t *testing.T) {
h.backupper.Backup(h.log, req, backupFile, nil, nil, nil)
require.NotNil(t, req.Status.Progress)
assert.Len(t, req.BackedUpItems, req.Status.Progress.TotalItems)
assert.Len(t, req.BackedUpItems, req.Status.Progress.ItemsBackedUp)
assert.Equal(t, req.BackedUpItems.Len(), req.Status.Progress.TotalItems)
assert.Equal(t, req.BackedUpItems.Len(), req.Status.Progress.ItemsBackedUp)
}
// TestBackupOldResourceFiltering runs backups with different combinations
@ -871,6 +873,7 @@ func TestBackupOldResourceFiltering(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@ -1048,6 +1051,7 @@ func TestCRDInclusion(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@ -1143,6 +1147,7 @@ func TestBackupResourceCohabitation(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@ -1169,6 +1174,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
backup1 := &Request{
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backup1File := bytes.NewBuffer([]byte{})
@ -1183,6 +1189,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
backup2 := &Request{
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backup2File := bytes.NewBuffer([]byte{})
@ -1233,6 +1240,7 @@ func TestBackupResourceOrdering(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@ -1349,6 +1357,7 @@ func TestBackupItemActionsForSkippedPV(t *testing.T) {
backupReq: &Request{
Backup: defaultBackup().SnapshotVolumes(false).Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
},
resPolicies: &resourcepolicies.ResourcePolicies{
Version: "v1",
@ -1395,6 +1404,7 @@ func TestBackupItemActionsForSkippedPV(t *testing.T) {
},
includedPVs: map[string]struct{}{},
},
BackedUpItems: NewBackedUpItemsMap(),
},
apiResources: []*test.APIResource{
test.PVCs(
@ -1641,6 +1651,7 @@ func TestBackupActionsRunForCorrectItems(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@ -1722,6 +1733,7 @@ func TestBackupWithInvalidActions(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@ -1872,6 +1884,7 @@ func TestBackupActionModifications(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@ -2128,6 +2141,7 @@ func TestBackupActionAdditionalItems(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@ -2385,6 +2399,7 @@ func TestItemBlockActionsRunForCorrectItems(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@ -2466,6 +2481,7 @@ func TestBackupWithInvalidItemBlockActions(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@ -2718,6 +2734,7 @@ func TestItemBlockActionRelatedItems(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@ -2882,6 +2899,7 @@ func TestBackupWithSnapshots(t *testing.T) {
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
},
apiResources: []*test.APIResource{
test.PVs(
@ -2916,6 +2934,7 @@ func TestBackupWithSnapshots(t *testing.T) {
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
},
apiResources: []*test.APIResource{
test.PVs(
@ -2951,6 +2970,7 @@ func TestBackupWithSnapshots(t *testing.T) {
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
},
apiResources: []*test.APIResource{
test.PVs(
@ -2986,6 +3006,7 @@ func TestBackupWithSnapshots(t *testing.T) {
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
},
apiResources: []*test.APIResource{
test.PVs(
@ -3021,6 +3042,7 @@ func TestBackupWithSnapshots(t *testing.T) {
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
},
apiResources: []*test.APIResource{
test.PVs(
@ -3054,6 +3076,7 @@ func TestBackupWithSnapshots(t *testing.T) {
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
},
apiResources: []*test.APIResource{
test.PVs(
@ -3070,6 +3093,7 @@ func TestBackupWithSnapshots(t *testing.T) {
req: &Request{
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
},
apiResources: []*test.APIResource{
test.PVs(
@ -3089,6 +3113,7 @@ func TestBackupWithSnapshots(t *testing.T) {
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
},
apiResources: []*test.APIResource{
test.PVs(
@ -3106,6 +3131,7 @@ func TestBackupWithSnapshots(t *testing.T) {
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
},
apiResources: []*test.APIResource{
test.PVs(
@ -3126,6 +3152,7 @@ func TestBackupWithSnapshots(t *testing.T) {
newSnapshotLocation("velero", "another", "another"),
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
},
apiResources: []*test.APIResource{
test.PVs(
@ -3256,6 +3283,7 @@ func TestBackupWithAsyncOperations(t *testing.T) {
req: &Request{
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
},
apiResources: []*test.APIResource{
test.Pods(
@ -3286,6 +3314,7 @@ func TestBackupWithAsyncOperations(t *testing.T) {
req: &Request{
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
},
apiResources: []*test.APIResource{
test.Pods(
@ -3316,6 +3345,7 @@ func TestBackupWithAsyncOperations(t *testing.T) {
req: &Request{
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
},
apiResources: []*test.APIResource{
test.Pods(
@ -3398,6 +3428,7 @@ func TestBackupWithInvalidHooks(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@ -3868,6 +3899,7 @@ func TestBackupWithHooks(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
podCommandExecutor = new(test.MockPodCommandExecutor)
@ -4071,6 +4103,7 @@ func TestBackupWithPodVolume(t *testing.T) {
Backup: tc.backup,
SnapshotLocations: []*velerov1.VolumeSnapshotLocation{tc.vsl},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@ -5181,6 +5214,7 @@ func TestBackupNewResourceFiltering(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@ -5342,6 +5376,7 @@ func TestBackupNamespaces(t *testing.T) {
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
}
backupFile = bytes.NewBuffer([]byte{})
)

View File

@ -175,12 +175,12 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
name: name,
}
if _, exists := ib.backupRequest.BackedUpItems[key]; exists {
if ib.backupRequest.BackedUpItems.Has(key) {
log.Info("Skipping item because it's already been backed up.")
// returning true since this item *is* in the backup, even though we're not backing it up here
return true, itemFiles, nil
}
ib.backupRequest.BackedUpItems[key] = struct{}{}
ib.backupRequest.BackedUpItems.AddItem(key)
log.Info("Backing up item")
var (

View File

@ -17,9 +17,6 @@ limitations under the License.
package backup
import (
"fmt"
"sort"
"github.com/vmware-tanzu/velero/internal/hook"
"github.com/vmware-tanzu/velero/internal/resourcepolicies"
"github.com/vmware-tanzu/velero/internal/volume"
@ -49,7 +46,7 @@ type Request struct {
ResolvedItemBlockActions []framework.ItemBlockResolvedAction
VolumeSnapshots []*volume.Snapshot
PodVolumeBackups []*velerov1api.PodVolumeBackup
BackedUpItems map[itemKey]struct{}
BackedUpItems *backedUpItemsMap
itemOperationsList *[]*itemoperation.BackupOperation
ResPolicies *resourcepolicies.Policies
SkippedPVTracker *skipPVTracker
@ -71,21 +68,7 @@ func (r *Request) GetItemOperationsList() *[]*itemoperation.BackupOperation {
// BackupResourceList returns the list of backed up resources grouped by the API
// Version and Kind
func (r *Request) BackupResourceList() map[string][]string {
resources := map[string][]string{}
for i := range r.BackedUpItems {
entry := i.name
if i.namespace != "" {
entry = fmt.Sprintf("%s/%s", i.namespace, i.name)
}
resources[i.resource] = append(resources[i.resource], entry)
}
// sort namespace/name entries for each GVK
for _, v := range resources {
sort.Strings(v)
}
return resources
return r.BackedUpItems.ResourceMap()
}
func (r *Request) FillVolumesInformation() {

View File

@ -44,9 +44,9 @@ func TestRequest_BackupResourceList(t *testing.T) {
name: "my-pv",
},
}
backedUpItems := map[itemKey]struct{}{}
backedUpItems := NewBackedUpItemsMap()
for _, it := range items {
backedUpItems[it] = struct{}{}
backedUpItems.AddItem(it)
}
req := Request{BackedUpItems: backedUpItems}
@ -70,9 +70,9 @@ func TestRequest_BackupResourceListEntriesSorted(t *testing.T) {
namespace: "ns1",
},
}
backedUpItems := map[itemKey]struct{}{}
backedUpItems := NewBackedUpItemsMap()
for _, it := range items {
backedUpItems[it] = struct{}{}
backedUpItems.AddItem(it)
}
req := Request{BackedUpItems: backedUpItems}

View File

@ -328,6 +328,7 @@ func (b *backupReconciler) prepareBackupRequest(backup *velerov1api.Backup, logg
request := &pkgbackup.Request{
Backup: backup.DeepCopy(), // don't modify items in the cache
SkippedPVTracker: pkgbackup.NewSkipPVTracker(),
BackedUpItems: pkgbackup.NewBackedUpItemsMap(),
}
request.VolumesInformation.Init()

View File

@ -160,6 +160,7 @@ func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
Backup: backup,
StorageLocation: location,
SkippedPVTracker: pkgbackup.NewSkipPVTracker(),
BackedUpItems: pkgbackup.NewBackedUpItemsMap(),
}
var outBackupFile *os.File
if len(operations) > 0 {