Make BackedUpItems thread safe
Signed-off-by: Scott Seago <sseago@redhat.com>pull/8366/head
parent
3c06fc8d87
commit
015b1e69f6
|
@ -0,0 +1 @@
|
||||||
|
Make BackedUpItems thread safe
|
|
@ -0,0 +1,90 @@
|
||||||
|
/*
|
||||||
|
Copyright the Velero contributors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package backup
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// backedUpItemsMap keeps track of the items already backed up for the current Velero Backup
|
||||||
|
type backedUpItemsMap struct {
|
||||||
|
*sync.RWMutex
|
||||||
|
backedUpItems map[itemKey]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBackedUpItemsMap() *backedUpItemsMap {
|
||||||
|
return &backedUpItemsMap{
|
||||||
|
RWMutex: &sync.RWMutex{},
|
||||||
|
backedUpItems: make(map[itemKey]struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *backedUpItemsMap) CopyItemMap() map[itemKey]struct{} {
|
||||||
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
|
returnMap := make(map[itemKey]struct{}, len(m.backedUpItems))
|
||||||
|
for key, val := range m.backedUpItems {
|
||||||
|
returnMap[key] = val
|
||||||
|
}
|
||||||
|
return returnMap
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResourceMap returns a map of the backed up items.
|
||||||
|
// For each map entry, the key is the resource type,
|
||||||
|
// and the value is a list of namespaced names for the resource.
|
||||||
|
func (m *backedUpItemsMap) ResourceMap() map[string][]string {
|
||||||
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
|
|
||||||
|
resources := map[string][]string{}
|
||||||
|
for i := range m.backedUpItems {
|
||||||
|
entry := i.name
|
||||||
|
if i.namespace != "" {
|
||||||
|
entry = fmt.Sprintf("%s/%s", i.namespace, i.name)
|
||||||
|
}
|
||||||
|
resources[i.resource] = append(resources[i.resource], entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// sort namespace/name entries for each GVK
|
||||||
|
for _, v := range resources {
|
||||||
|
sort.Strings(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return resources
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *backedUpItemsMap) Len() int {
|
||||||
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
|
return len(m.backedUpItems)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *backedUpItemsMap) Has(key itemKey) bool {
|
||||||
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
|
|
||||||
|
_, exists := m.backedUpItems[key]
|
||||||
|
return exists
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *backedUpItemsMap) AddItem(key itemKey) {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
m.backedUpItems[key] = struct{}{}
|
||||||
|
}
|
|
@ -297,8 +297,6 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
backupRequest.BackedUpItems = map[itemKey]struct{}{}
|
|
||||||
|
|
||||||
podVolumeTimeout := kb.podVolumeTimeout
|
podVolumeTimeout := kb.podVolumeTimeout
|
||||||
if val := backupRequest.Annotations[velerov1api.PodVolumeOperationTimeoutAnnotation]; val != "" {
|
if val := backupRequest.Annotations[velerov1api.PodVolumeOperationTimeoutAnnotation]; val != "" {
|
||||||
parsed, err := time.ParseDuration(val)
|
parsed, err := time.ParseDuration(val)
|
||||||
|
@ -499,12 +497,13 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
|
||||||
|
|
||||||
// updated total is computed as "how many items we've backed up so far, plus
|
// updated total is computed as "how many items we've backed up so far, plus
|
||||||
// how many items we know of that are remaining"
|
// how many items we know of that are remaining"
|
||||||
totalItems := len(backupRequest.BackedUpItems) + (len(items) - (i + 1))
|
backedUpItems := backupRequest.BackedUpItems.Len()
|
||||||
|
totalItems := backedUpItems + (len(items) - (i + 1))
|
||||||
|
|
||||||
// send a progress update
|
// send a progress update
|
||||||
update <- progressUpdate{
|
update <- progressUpdate{
|
||||||
totalItems: totalItems,
|
totalItems: totalItems,
|
||||||
itemsBackedUp: len(backupRequest.BackedUpItems),
|
itemsBackedUp: backedUpItems,
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(map[string]interface{}{
|
log.WithFields(map[string]interface{}{
|
||||||
|
@ -512,7 +511,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
|
||||||
"resource": items[i].groupResource.String(),
|
"resource": items[i].groupResource.String(),
|
||||||
"namespace": items[i].namespace,
|
"namespace": items[i].namespace,
|
||||||
"name": items[i].name,
|
"name": items[i].name,
|
||||||
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", len(backupRequest.BackedUpItems), totalItems)
|
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", backedUpItems, totalItems)
|
||||||
}
|
}
|
||||||
|
|
||||||
// no more progress updates will be sent on the 'update' channel
|
// no more progress updates will be sent on the 'update' channel
|
||||||
|
@ -538,8 +537,9 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
|
||||||
if updated.Status.Progress == nil {
|
if updated.Status.Progress == nil {
|
||||||
updated.Status.Progress = &velerov1api.BackupProgress{}
|
updated.Status.Progress = &velerov1api.BackupProgress{}
|
||||||
}
|
}
|
||||||
updated.Status.Progress.TotalItems = len(backupRequest.BackedUpItems)
|
backedUpItems := backupRequest.BackedUpItems.Len()
|
||||||
updated.Status.Progress.ItemsBackedUp = len(backupRequest.BackedUpItems)
|
updated.Status.Progress.TotalItems = backedUpItems
|
||||||
|
updated.Status.Progress.ItemsBackedUp = backedUpItems
|
||||||
|
|
||||||
// update the hooks execution status
|
// update the hooks execution status
|
||||||
if updated.Status.HookStatus == nil {
|
if updated.Status.HookStatus == nil {
|
||||||
|
@ -558,8 +558,8 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
|
||||||
log.Infof("Summary for skipped PVs: %s", skippedPVSummary)
|
log.Infof("Summary for skipped PVs: %s", skippedPVSummary)
|
||||||
}
|
}
|
||||||
|
|
||||||
backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: len(backupRequest.BackedUpItems), ItemsBackedUp: len(backupRequest.BackedUpItems)}
|
backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: backedUpItems, ItemsBackedUp: backedUpItems}
|
||||||
log.WithField("progress", "").Infof("Backed up a total of %d items", len(backupRequest.BackedUpItems))
|
log.WithField("progress", "").Infof("Backed up a total of %d items", backedUpItems)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -667,7 +667,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Don't run hooks if pod has already been backed up
|
// Don't run hooks if pod has already been backed up
|
||||||
if _, exists := itemBlock.itemBackupper.backupRequest.BackedUpItems[key]; !exists {
|
if !itemBlock.itemBackupper.backupRequest.BackedUpItems.Has(key) {
|
||||||
preHookPods = append(preHookPods, item)
|
preHookPods = append(preHookPods, item)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -681,7 +681,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
|
||||||
itemBlock.Log.WithError(errors.WithStack(err)).Error("Error accessing pod metadata")
|
itemBlock.Log.WithError(errors.WithStack(err)).Error("Error accessing pod metadata")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
itemBlock.itemBackupper.backupRequest.BackedUpItems[key] = struct{}{}
|
itemBlock.itemBackupper.backupRequest.BackedUpItems.AddItem(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
itemBlock.Log.Debug("Backing up items in BackupItemBlock")
|
itemBlock.Log.Debug("Backing up items in BackupItemBlock")
|
||||||
|
@ -861,8 +861,6 @@ func (kb *kubernetesBackupper) FinalizeBackup(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
backupRequest.BackedUpItems = map[itemKey]struct{}{}
|
|
||||||
|
|
||||||
// set up a temp dir for the itemCollector to use to temporarily
|
// set up a temp dir for the itemCollector to use to temporarily
|
||||||
// store items as they're scraped from the API.
|
// store items as they're scraped from the API.
|
||||||
tempDir, err := os.MkdirTemp("", "")
|
tempDir, err := os.MkdirTemp("", "")
|
||||||
|
@ -947,14 +945,15 @@ func (kb *kubernetesBackupper) FinalizeBackup(
|
||||||
|
|
||||||
// updated total is computed as "how many items we've backed up so far, plus
|
// updated total is computed as "how many items we've backed up so far, plus
|
||||||
// how many items we know of that are remaining"
|
// how many items we know of that are remaining"
|
||||||
totalItems := len(backupRequest.BackedUpItems) + (len(items) - (i + 1))
|
backedUpItems := backupRequest.BackedUpItems.Len()
|
||||||
|
totalItems := backedUpItems + (len(items) - (i + 1))
|
||||||
|
|
||||||
log.WithFields(map[string]interface{}{
|
log.WithFields(map[string]interface{}{
|
||||||
"progress": "",
|
"progress": "",
|
||||||
"resource": item.groupResource.String(),
|
"resource": item.groupResource.String(),
|
||||||
"namespace": item.namespace,
|
"namespace": item.namespace,
|
||||||
"name": item.name,
|
"name": item.name,
|
||||||
}).Infof("Updated %d items out of an estimated total of %d (estimate will change throughout the backup finalizer)", len(backupRequest.BackedUpItems), totalItems)
|
}).Infof("Updated %d items out of an estimated total of %d (estimate will change throughout the backup finalizer)", backedUpItems, totalItems)
|
||||||
}
|
}
|
||||||
|
|
||||||
volumeInfos, err := backupStore.GetBackupVolumeInfos(backupRequest.Backup.Name)
|
volumeInfos, err := backupStore.GetBackupVolumeInfos(backupRequest.Backup.Name)
|
||||||
|
@ -979,7 +978,7 @@ func (kb *kubernetesBackupper) FinalizeBackup(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithField("progress", "").Infof("Updated a total of %d items", len(backupRequest.BackedUpItems))
|
log.WithField("progress", "").Infof("Updated a total of %d items", backupRequest.BackedUpItems.Len())
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,6 +76,7 @@ func TestBackedUpItemsMatchesTarballContents(t *testing.T) {
|
||||||
req := &Request{
|
req := &Request{
|
||||||
Backup: defaultBackup().Result(),
|
Backup: defaultBackup().Result(),
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
|
|
||||||
backupFile := bytes.NewBuffer([]byte{})
|
backupFile := bytes.NewBuffer([]byte{})
|
||||||
|
@ -103,7 +104,7 @@ func TestBackedUpItemsMatchesTarballContents(t *testing.T) {
|
||||||
// go through BackedUpItems after the backup to assemble the list of files we
|
// go through BackedUpItems after the backup to assemble the list of files we
|
||||||
// expect to see in the tarball and compare to see if they match
|
// expect to see in the tarball and compare to see if they match
|
||||||
var expectedFiles []string
|
var expectedFiles []string
|
||||||
for item := range req.BackedUpItems {
|
for item := range req.BackedUpItems.CopyItemMap() {
|
||||||
file := "resources/" + gvkToResource[item.resource]
|
file := "resources/" + gvkToResource[item.resource]
|
||||||
if item.namespace != "" {
|
if item.namespace != "" {
|
||||||
file = file + "/namespaces/" + item.namespace
|
file = file + "/namespaces/" + item.namespace
|
||||||
|
@ -135,6 +136,7 @@ func TestBackupProgressIsUpdated(t *testing.T) {
|
||||||
req := &Request{
|
req := &Request{
|
||||||
Backup: defaultBackup().Result(),
|
Backup: defaultBackup().Result(),
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile := bytes.NewBuffer([]byte{})
|
backupFile := bytes.NewBuffer([]byte{})
|
||||||
|
|
||||||
|
@ -159,8 +161,8 @@ func TestBackupProgressIsUpdated(t *testing.T) {
|
||||||
h.backupper.Backup(h.log, req, backupFile, nil, nil, nil)
|
h.backupper.Backup(h.log, req, backupFile, nil, nil, nil)
|
||||||
|
|
||||||
require.NotNil(t, req.Status.Progress)
|
require.NotNil(t, req.Status.Progress)
|
||||||
assert.Len(t, req.BackedUpItems, req.Status.Progress.TotalItems)
|
assert.Equal(t, req.BackedUpItems.Len(), req.Status.Progress.TotalItems)
|
||||||
assert.Len(t, req.BackedUpItems, req.Status.Progress.ItemsBackedUp)
|
assert.Equal(t, req.BackedUpItems.Len(), req.Status.Progress.ItemsBackedUp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestBackupOldResourceFiltering runs backups with different combinations
|
// TestBackupOldResourceFiltering runs backups with different combinations
|
||||||
|
@ -871,6 +873,7 @@ func TestBackupOldResourceFiltering(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
@ -1048,6 +1051,7 @@ func TestCRDInclusion(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
@ -1143,6 +1147,7 @@ func TestBackupResourceCohabitation(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
@ -1169,6 +1174,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
|
||||||
backup1 := &Request{
|
backup1 := &Request{
|
||||||
Backup: defaultBackup().Result(),
|
Backup: defaultBackup().Result(),
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backup1File := bytes.NewBuffer([]byte{})
|
backup1File := bytes.NewBuffer([]byte{})
|
||||||
|
|
||||||
|
@ -1183,6 +1189,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
|
||||||
backup2 := &Request{
|
backup2 := &Request{
|
||||||
Backup: defaultBackup().Result(),
|
Backup: defaultBackup().Result(),
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backup2File := bytes.NewBuffer([]byte{})
|
backup2File := bytes.NewBuffer([]byte{})
|
||||||
|
|
||||||
|
@ -1233,6 +1240,7 @@ func TestBackupResourceOrdering(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
@ -1349,6 +1357,7 @@ func TestBackupItemActionsForSkippedPV(t *testing.T) {
|
||||||
backupReq: &Request{
|
backupReq: &Request{
|
||||||
Backup: defaultBackup().SnapshotVolumes(false).Result(),
|
Backup: defaultBackup().SnapshotVolumes(false).Result(),
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
resPolicies: &resourcepolicies.ResourcePolicies{
|
resPolicies: &resourcepolicies.ResourcePolicies{
|
||||||
Version: "v1",
|
Version: "v1",
|
||||||
|
@ -1395,6 +1404,7 @@ func TestBackupItemActionsForSkippedPV(t *testing.T) {
|
||||||
},
|
},
|
||||||
includedPVs: map[string]struct{}{},
|
includedPVs: map[string]struct{}{},
|
||||||
},
|
},
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
apiResources: []*test.APIResource{
|
apiResources: []*test.APIResource{
|
||||||
test.PVCs(
|
test.PVCs(
|
||||||
|
@ -1641,6 +1651,7 @@ func TestBackupActionsRunForCorrectItems(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
@ -1722,6 +1733,7 @@ func TestBackupWithInvalidActions(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
@ -1872,6 +1884,7 @@ func TestBackupActionModifications(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
@ -2128,6 +2141,7 @@ func TestBackupActionAdditionalItems(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
@ -2385,6 +2399,7 @@ func TestItemBlockActionsRunForCorrectItems(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
@ -2466,6 +2481,7 @@ func TestBackupWithInvalidItemBlockActions(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
@ -2718,6 +2734,7 @@ func TestItemBlockActionRelatedItems(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
@ -2882,6 +2899,7 @@ func TestBackupWithSnapshots(t *testing.T) {
|
||||||
newSnapshotLocation("velero", "default", "default"),
|
newSnapshotLocation("velero", "default", "default"),
|
||||||
},
|
},
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
apiResources: []*test.APIResource{
|
apiResources: []*test.APIResource{
|
||||||
test.PVs(
|
test.PVs(
|
||||||
|
@ -2916,6 +2934,7 @@ func TestBackupWithSnapshots(t *testing.T) {
|
||||||
newSnapshotLocation("velero", "default", "default"),
|
newSnapshotLocation("velero", "default", "default"),
|
||||||
},
|
},
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
apiResources: []*test.APIResource{
|
apiResources: []*test.APIResource{
|
||||||
test.PVs(
|
test.PVs(
|
||||||
|
@ -2951,6 +2970,7 @@ func TestBackupWithSnapshots(t *testing.T) {
|
||||||
newSnapshotLocation("velero", "default", "default"),
|
newSnapshotLocation("velero", "default", "default"),
|
||||||
},
|
},
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
apiResources: []*test.APIResource{
|
apiResources: []*test.APIResource{
|
||||||
test.PVs(
|
test.PVs(
|
||||||
|
@ -2986,6 +3006,7 @@ func TestBackupWithSnapshots(t *testing.T) {
|
||||||
newSnapshotLocation("velero", "default", "default"),
|
newSnapshotLocation("velero", "default", "default"),
|
||||||
},
|
},
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
apiResources: []*test.APIResource{
|
apiResources: []*test.APIResource{
|
||||||
test.PVs(
|
test.PVs(
|
||||||
|
@ -3021,6 +3042,7 @@ func TestBackupWithSnapshots(t *testing.T) {
|
||||||
newSnapshotLocation("velero", "default", "default"),
|
newSnapshotLocation("velero", "default", "default"),
|
||||||
},
|
},
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
apiResources: []*test.APIResource{
|
apiResources: []*test.APIResource{
|
||||||
test.PVs(
|
test.PVs(
|
||||||
|
@ -3054,6 +3076,7 @@ func TestBackupWithSnapshots(t *testing.T) {
|
||||||
newSnapshotLocation("velero", "default", "default"),
|
newSnapshotLocation("velero", "default", "default"),
|
||||||
},
|
},
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
apiResources: []*test.APIResource{
|
apiResources: []*test.APIResource{
|
||||||
test.PVs(
|
test.PVs(
|
||||||
|
@ -3070,6 +3093,7 @@ func TestBackupWithSnapshots(t *testing.T) {
|
||||||
req: &Request{
|
req: &Request{
|
||||||
Backup: defaultBackup().Result(),
|
Backup: defaultBackup().Result(),
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
apiResources: []*test.APIResource{
|
apiResources: []*test.APIResource{
|
||||||
test.PVs(
|
test.PVs(
|
||||||
|
@ -3089,6 +3113,7 @@ func TestBackupWithSnapshots(t *testing.T) {
|
||||||
newSnapshotLocation("velero", "default", "default"),
|
newSnapshotLocation("velero", "default", "default"),
|
||||||
},
|
},
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
apiResources: []*test.APIResource{
|
apiResources: []*test.APIResource{
|
||||||
test.PVs(
|
test.PVs(
|
||||||
|
@ -3106,6 +3131,7 @@ func TestBackupWithSnapshots(t *testing.T) {
|
||||||
newSnapshotLocation("velero", "default", "default"),
|
newSnapshotLocation("velero", "default", "default"),
|
||||||
},
|
},
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
apiResources: []*test.APIResource{
|
apiResources: []*test.APIResource{
|
||||||
test.PVs(
|
test.PVs(
|
||||||
|
@ -3126,6 +3152,7 @@ func TestBackupWithSnapshots(t *testing.T) {
|
||||||
newSnapshotLocation("velero", "another", "another"),
|
newSnapshotLocation("velero", "another", "another"),
|
||||||
},
|
},
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
apiResources: []*test.APIResource{
|
apiResources: []*test.APIResource{
|
||||||
test.PVs(
|
test.PVs(
|
||||||
|
@ -3256,6 +3283,7 @@ func TestBackupWithAsyncOperations(t *testing.T) {
|
||||||
req: &Request{
|
req: &Request{
|
||||||
Backup: defaultBackup().Result(),
|
Backup: defaultBackup().Result(),
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
apiResources: []*test.APIResource{
|
apiResources: []*test.APIResource{
|
||||||
test.Pods(
|
test.Pods(
|
||||||
|
@ -3286,6 +3314,7 @@ func TestBackupWithAsyncOperations(t *testing.T) {
|
||||||
req: &Request{
|
req: &Request{
|
||||||
Backup: defaultBackup().Result(),
|
Backup: defaultBackup().Result(),
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
apiResources: []*test.APIResource{
|
apiResources: []*test.APIResource{
|
||||||
test.Pods(
|
test.Pods(
|
||||||
|
@ -3316,6 +3345,7 @@ func TestBackupWithAsyncOperations(t *testing.T) {
|
||||||
req: &Request{
|
req: &Request{
|
||||||
Backup: defaultBackup().Result(),
|
Backup: defaultBackup().Result(),
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
},
|
},
|
||||||
apiResources: []*test.APIResource{
|
apiResources: []*test.APIResource{
|
||||||
test.Pods(
|
test.Pods(
|
||||||
|
@ -3398,6 +3428,7 @@ func TestBackupWithInvalidHooks(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
@ -3868,6 +3899,7 @@ func TestBackupWithHooks(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
podCommandExecutor = new(test.MockPodCommandExecutor)
|
podCommandExecutor = new(test.MockPodCommandExecutor)
|
||||||
|
@ -4071,6 +4103,7 @@ func TestBackupWithPodVolume(t *testing.T) {
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SnapshotLocations: []*velerov1.VolumeSnapshotLocation{tc.vsl},
|
SnapshotLocations: []*velerov1.VolumeSnapshotLocation{tc.vsl},
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
@ -5181,6 +5214,7 @@ func TestBackupNewResourceFiltering(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
@ -5342,6 +5376,7 @@ func TestBackupNamespaces(t *testing.T) {
|
||||||
req = &Request{
|
req = &Request{
|
||||||
Backup: tc.backup,
|
Backup: tc.backup,
|
||||||
SkippedPVTracker: NewSkipPVTracker(),
|
SkippedPVTracker: NewSkipPVTracker(),
|
||||||
|
BackedUpItems: NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
backupFile = bytes.NewBuffer([]byte{})
|
backupFile = bytes.NewBuffer([]byte{})
|
||||||
)
|
)
|
||||||
|
|
|
@ -175,12 +175,12 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
|
||||||
name: name,
|
name: name,
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, exists := ib.backupRequest.BackedUpItems[key]; exists {
|
if ib.backupRequest.BackedUpItems.Has(key) {
|
||||||
log.Info("Skipping item because it's already been backed up.")
|
log.Info("Skipping item because it's already been backed up.")
|
||||||
// returning true since this item *is* in the backup, even though we're not backing it up here
|
// returning true since this item *is* in the backup, even though we're not backing it up here
|
||||||
return true, itemFiles, nil
|
return true, itemFiles, nil
|
||||||
}
|
}
|
||||||
ib.backupRequest.BackedUpItems[key] = struct{}{}
|
ib.backupRequest.BackedUpItems.AddItem(key)
|
||||||
log.Info("Backing up item")
|
log.Info("Backing up item")
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -17,9 +17,6 @@ limitations under the License.
|
||||||
package backup
|
package backup
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"sort"
|
|
||||||
|
|
||||||
"github.com/vmware-tanzu/velero/internal/hook"
|
"github.com/vmware-tanzu/velero/internal/hook"
|
||||||
"github.com/vmware-tanzu/velero/internal/resourcepolicies"
|
"github.com/vmware-tanzu/velero/internal/resourcepolicies"
|
||||||
"github.com/vmware-tanzu/velero/internal/volume"
|
"github.com/vmware-tanzu/velero/internal/volume"
|
||||||
|
@ -49,7 +46,7 @@ type Request struct {
|
||||||
ResolvedItemBlockActions []framework.ItemBlockResolvedAction
|
ResolvedItemBlockActions []framework.ItemBlockResolvedAction
|
||||||
VolumeSnapshots []*volume.Snapshot
|
VolumeSnapshots []*volume.Snapshot
|
||||||
PodVolumeBackups []*velerov1api.PodVolumeBackup
|
PodVolumeBackups []*velerov1api.PodVolumeBackup
|
||||||
BackedUpItems map[itemKey]struct{}
|
BackedUpItems *backedUpItemsMap
|
||||||
itemOperationsList *[]*itemoperation.BackupOperation
|
itemOperationsList *[]*itemoperation.BackupOperation
|
||||||
ResPolicies *resourcepolicies.Policies
|
ResPolicies *resourcepolicies.Policies
|
||||||
SkippedPVTracker *skipPVTracker
|
SkippedPVTracker *skipPVTracker
|
||||||
|
@ -71,21 +68,7 @@ func (r *Request) GetItemOperationsList() *[]*itemoperation.BackupOperation {
|
||||||
// BackupResourceList returns the list of backed up resources grouped by the API
|
// BackupResourceList returns the list of backed up resources grouped by the API
|
||||||
// Version and Kind
|
// Version and Kind
|
||||||
func (r *Request) BackupResourceList() map[string][]string {
|
func (r *Request) BackupResourceList() map[string][]string {
|
||||||
resources := map[string][]string{}
|
return r.BackedUpItems.ResourceMap()
|
||||||
for i := range r.BackedUpItems {
|
|
||||||
entry := i.name
|
|
||||||
if i.namespace != "" {
|
|
||||||
entry = fmt.Sprintf("%s/%s", i.namespace, i.name)
|
|
||||||
}
|
|
||||||
resources[i.resource] = append(resources[i.resource], entry)
|
|
||||||
}
|
|
||||||
|
|
||||||
// sort namespace/name entries for each GVK
|
|
||||||
for _, v := range resources {
|
|
||||||
sort.Strings(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
return resources
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Request) FillVolumesInformation() {
|
func (r *Request) FillVolumesInformation() {
|
||||||
|
|
|
@ -44,9 +44,9 @@ func TestRequest_BackupResourceList(t *testing.T) {
|
||||||
name: "my-pv",
|
name: "my-pv",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
backedUpItems := map[itemKey]struct{}{}
|
backedUpItems := NewBackedUpItemsMap()
|
||||||
for _, it := range items {
|
for _, it := range items {
|
||||||
backedUpItems[it] = struct{}{}
|
backedUpItems.AddItem(it)
|
||||||
}
|
}
|
||||||
|
|
||||||
req := Request{BackedUpItems: backedUpItems}
|
req := Request{BackedUpItems: backedUpItems}
|
||||||
|
@ -70,9 +70,9 @@ func TestRequest_BackupResourceListEntriesSorted(t *testing.T) {
|
||||||
namespace: "ns1",
|
namespace: "ns1",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
backedUpItems := map[itemKey]struct{}{}
|
backedUpItems := NewBackedUpItemsMap()
|
||||||
for _, it := range items {
|
for _, it := range items {
|
||||||
backedUpItems[it] = struct{}{}
|
backedUpItems.AddItem(it)
|
||||||
}
|
}
|
||||||
|
|
||||||
req := Request{BackedUpItems: backedUpItems}
|
req := Request{BackedUpItems: backedUpItems}
|
||||||
|
|
|
@ -328,6 +328,7 @@ func (b *backupReconciler) prepareBackupRequest(backup *velerov1api.Backup, logg
|
||||||
request := &pkgbackup.Request{
|
request := &pkgbackup.Request{
|
||||||
Backup: backup.DeepCopy(), // don't modify items in the cache
|
Backup: backup.DeepCopy(), // don't modify items in the cache
|
||||||
SkippedPVTracker: pkgbackup.NewSkipPVTracker(),
|
SkippedPVTracker: pkgbackup.NewSkipPVTracker(),
|
||||||
|
BackedUpItems: pkgbackup.NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
request.VolumesInformation.Init()
|
request.VolumesInformation.Init()
|
||||||
|
|
||||||
|
|
|
@ -160,6 +160,7 @@ func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
||||||
Backup: backup,
|
Backup: backup,
|
||||||
StorageLocation: location,
|
StorageLocation: location,
|
||||||
SkippedPVTracker: pkgbackup.NewSkipPVTracker(),
|
SkippedPVTracker: pkgbackup.NewSkipPVTracker(),
|
||||||
|
BackedUpItems: pkgbackup.NewBackedUpItemsMap(),
|
||||||
}
|
}
|
||||||
var outBackupFile *os.File
|
var outBackupFile *os.File
|
||||||
if len(operations) > 0 {
|
if len(operations) > 0 {
|
||||||
|
|
Loading…
Reference in New Issue