Implement parallel ItemBlock processing via backup_controller goroutines

Signed-off-by: Scott Seago <sseago@redhat.com>
pull/8659/head
Scott Seago 2025-01-16 18:07:59 -05:00
parent 79707aaa60
commit fcfb2fd9ee
12 changed files with 371 additions and 70 deletions

View File

@ -0,0 +1 @@
Implement parallel ItemBlock processing via backup_controller goroutines

View File

@ -191,25 +191,25 @@ type ItemBlockWorkerPool struct {
} }
type ItemBlockInput struct { type ItemBlockInput struct {
itemBlock ItemBlock itemBlock *BackupItemBlock
returnChan chan ItemBlockReturn returnChan chan ItemBlockReturn
} }
type ItemBlockReturn struct { type ItemBlockReturn struct {
itemBlock ItemBlock itemBlock *BackupItemBlock
resources []schema.GroupResource resources []schema.GroupResource
err error err error
} }
func (*p ItemBlockWorkerPool) getInputChannel() chan ItemBlockInput func (*p ItemBlockWorkerPool) getInputChannel() chan ItemBlockInput
func RunItemBlockWorkers(context context.Context, workers int) func StartItemBlockWorkerPool(context context.Context, workers int, logger logrus.FieldLogger) ItemBlockWorkerPool
func processItemBlocksWorker(context context.Context, itemBlockChannel chan ItemBlockInput, logger logrus.FieldLogger, wg *sync.WaitGroup) func processItemBlockWorker(context context.Context, itemBlockChannel chan ItemBlockInput, logger logrus.FieldLogger, wg *sync.WaitGroup)
``` ```
The worker pool will be started by calling `RunItemBlockWorkers` in `backupReconciler.SetupWithManager`, passing in the worker count and reconciler context. The worker pool will be started by calling `StartItemBlockWorkerPool` in `NewBackupReconciler()`, passing in the worker count and reconciler context.
`SetupWithManager` will also add the input channel to the `itemBackupper` so that it will be available during backup processing. `backupreconciler.prepareBackupRequest` will also add the input channel to the `backupRequest` so that it will be available during backup processing.
The func `RunItemBlockWorkers` will create the `ItemBlockWorkerPool` with a shared buffered input channel (fixed buffer size) and start `workers` gororoutines which will each call `processItemBlocksWorker`. The func `StartItemBlockWorkerPool` will create the `ItemBlockWorkerPool` with a shared buffered input channel (fixed buffer size) and start `workers` gororoutines which will each call `processItemBlockWorker`.
The `processItemBlocksWorker` func (run by the worker goroutines) will read from `itemBlockChannel`, call `BackupItemBlock` on the retrieved `ItemBlock`, and then send the return value to the retrieved `returnChan`, and then process the next block. The `processItemBlockWorker` func (run by the worker goroutines) will read from `itemBlockChannel`, call `BackupItemBlock` on the retrieved `ItemBlock`, and then send the return value to the retrieved `returnChan`, and then process the next block.
#### Modify ItemBlock processing loop to send ItemBlocks to the worker pool rather than backing them up directly #### Modify ItemBlock processing loop to send ItemBlocks to the worker pool rather than backing them up directly

View File

@ -26,12 +26,14 @@ import (
type backedUpItemsMap struct { type backedUpItemsMap struct {
*sync.RWMutex *sync.RWMutex
backedUpItems map[itemKey]struct{} backedUpItems map[itemKey]struct{}
totalItems map[itemKey]struct{}
} }
func NewBackedUpItemsMap() *backedUpItemsMap { func NewBackedUpItemsMap() *backedUpItemsMap {
return &backedUpItemsMap{ return &backedUpItemsMap{
RWMutex: &sync.RWMutex{}, RWMutex: &sync.RWMutex{},
backedUpItems: make(map[itemKey]struct{}), backedUpItems: make(map[itemKey]struct{}),
totalItems: make(map[itemKey]struct{}),
} }
} }
@ -75,6 +77,12 @@ func (m *backedUpItemsMap) Len() int {
return len(m.backedUpItems) return len(m.backedUpItems)
} }
func (m *backedUpItemsMap) BackedUpAndTotalLen() (int, int) {
m.RLock()
defer m.RUnlock()
return len(m.backedUpItems), len(m.totalItems)
}
func (m *backedUpItemsMap) Has(key itemKey) bool { func (m *backedUpItemsMap) Has(key itemKey) bool {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
@ -87,4 +95,11 @@ func (m *backedUpItemsMap) AddItem(key itemKey) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
m.backedUpItems[key] = struct{}{} m.backedUpItems[key] = struct{}{}
m.totalItems[key] = struct{}{}
}
func (m *backedUpItemsMap) AddItemToTotal(key itemKey) {
m.Lock()
defer m.Unlock()
m.totalItems[key] = struct{}{}
} }

View File

@ -26,6 +26,7 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -238,7 +239,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
gzippedData := gzip.NewWriter(backupFile) gzippedData := gzip.NewWriter(backupFile)
defer gzippedData.Close() defer gzippedData.Close()
tw := tar.NewWriter(gzippedData) tw := NewTarWriter(tar.NewWriter(gzippedData))
defer tw.Close() defer tw.Close()
log.Info("Writing backup version file") log.Info("Writing backup version file")
@ -380,6 +381,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
boolptr.IsSetToTrue(backupRequest.Spec.DefaultVolumesToFsBackup), boolptr.IsSetToTrue(backupRequest.Spec.DefaultVolumesToFsBackup),
!backupRequest.ResourceIncludesExcludes.ShouldInclude(kuberesource.PersistentVolumeClaims.String()), !backupRequest.ResourceIncludesExcludes.ShouldInclude(kuberesource.PersistentVolumeClaims.String()),
), ),
kubernetesBackupper: kb,
} }
// helper struct to send current progress between the main // helper struct to send current progress between the main
@ -431,6 +433,8 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
} }
}() }()
responseCtx, responseCancel := context.WithCancel(context.Background())
backedUpGroupResources := map[schema.GroupResource]bool{} backedUpGroupResources := map[schema.GroupResource]bool{}
// Maps items in the item list from GR+NamespacedName to a slice of pointers to kubernetesResources // Maps items in the item list from GR+NamespacedName to a slice of pointers to kubernetesResources
// We need the slice value since if the EnableAPIGroupVersions feature flag is set, there may // We need the slice value since if the EnableAPIGroupVersions feature flag is set, there may
@ -443,9 +447,60 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
Name: items[i].name, Name: items[i].name,
} }
itemsMap[key] = append(itemsMap[key], items[i]) itemsMap[key] = append(itemsMap[key], items[i])
// add to total items for progress reporting
if items[i].kind != "" {
backupRequest.BackedUpItems.AddItemToTotal(itemKey{
resource: fmt.Sprintf("%s/%s", items[i].preferredGVR.GroupVersion().String(), items[i].kind),
namespace: items[i].namespace,
name: items[i].name,
})
}
} }
var itemBlock *BackupItemBlock var itemBlock *BackupItemBlock
itemBlockReturn := make(chan ItemBlockReturn, 100)
wg := &sync.WaitGroup{}
// Handle returns from worker pool processing ItemBlocks
go func() {
for {
select {
case response := <-itemBlockReturn: // process each BackupItemBlock response
func() {
defer wg.Done()
if response.err != nil {
log.WithError(errors.WithStack((response.err))).Error("Got error in BackupItemBlock.")
}
for _, backedUpGR := range response.resources {
backedUpGroupResources[backedUpGR] = true
}
// We could eventually track which itemBlocks have finished
// using response.itemBlock
// updated total is computed as "how many items we've backed up so far,
// plus how many items are processed but not yet backed up plus how many
// we know of that are remaining to be processed"
backedUpItems, totalItems := backupRequest.BackedUpItems.BackedUpAndTotalLen()
// send a progress update
update <- progressUpdate{
totalItems: totalItems,
itemsBackedUp: backedUpItems,
}
if len(response.itemBlock.Items) > 0 {
log.WithFields(map[string]any{
"progress": "",
"kind": response.itemBlock.Items[0].Item.GroupVersionKind().GroupKind().String(),
"namespace": response.itemBlock.Items[0].Item.GetNamespace(),
"name": response.itemBlock.Items[0].Item.GetName(),
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", backedUpItems, totalItems)
}
}()
case <-responseCtx.Done():
return
}
}
}()
for i := range items { for i := range items {
log.WithFields(map[string]any{ log.WithFields(map[string]any{
@ -491,32 +546,32 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource
if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock { if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock {
log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items)) log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items))
backedUpGRs := kb.backupItemBlock(*itemBlock)
for _, backedUpGR := range backedUpGRs { wg.Add(1)
backedUpGroupResources[backedUpGR] = true backupRequest.ItemBlockChannel <- ItemBlockInput{
itemBlock: itemBlock,
returnChan: itemBlockReturn,
} }
itemBlock = nil itemBlock = nil
} }
// updated total is computed as "how many items we've backed up so far, plus
// how many items we know of that are remaining"
backedUpItems := backupRequest.BackedUpItems.Len()
totalItems := backedUpItems + (len(items) - (i + 1))
// send a progress update
update <- progressUpdate{
totalItems: totalItems,
itemsBackedUp: backedUpItems,
}
log.WithFields(map[string]any{
"progress": "",
"resource": items[i].groupResource.String(),
"namespace": items[i].namespace,
"name": items[i].name,
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", backedUpItems, totalItems)
} }
done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()
// Wait for all the ItemBlocks to be processed
select {
case <-done:
log.Info("done processing ItemBlocks")
case <-responseCtx.Done():
log.Info("ItemBlock processing canceled")
}
// cancel response-processing goroutine
responseCancel()
// no more progress updates will be sent on the 'update' channel // no more progress updates will be sent on the 'update' channel
quit <- struct{}{} quit <- struct{}{}
@ -663,7 +718,7 @@ func (kb *kubernetesBackupper) executeItemBlockActions(
} }
} }
func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []schema.GroupResource { func (kb *kubernetesBackupper) backupItemBlock(itemBlock *BackupItemBlock) []schema.GroupResource {
// find pods in ItemBlock // find pods in ItemBlock
// filter pods based on whether they still need to be backed up // filter pods based on whether they still need to be backed up
// this list will be used to run pre/post hooks // this list will be used to run pre/post hooks
@ -697,7 +752,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
itemBlock.Log.Debug("Backing up items in BackupItemBlock") itemBlock.Log.Debug("Backing up items in BackupItemBlock")
var grList []schema.GroupResource var grList []schema.GroupResource
for _, item := range itemBlock.Items { for _, item := range itemBlock.Items {
if backedUp := kb.backupItem(itemBlock.Log, item.Gr, itemBlock.itemBackupper, item.Item, item.PreferredGVR, &itemBlock); backedUp { if backedUp := kb.backupItem(itemBlock.Log, item.Gr, itemBlock.itemBackupper, item.Item, item.PreferredGVR, itemBlock); backedUp {
grList = append(grList, item.Gr) grList = append(grList, item.Gr)
} }
} }
@ -724,7 +779,7 @@ func (kb *kubernetesBackupper) getItemKey(item itemblock.ItemBlockItem) (itemKey
return key, nil return key, nil
} }
func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) { func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock *BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
var successPods []itemblock.ItemBlockItem var successPods []itemblock.ItemBlockItem
var failedPods []itemblock.ItemBlockItem var failedPods []itemblock.ItemBlockItem
var errs []error var errs []error
@ -741,7 +796,7 @@ func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock
} }
// The hooks cannot execute until the PVBs to be processed // The hooks cannot execute until the PVBs to be processed
func (kb *kubernetesBackupper) handleItemBlockPostHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) { func (kb *kubernetesBackupper) handleItemBlockPostHooks(itemBlock *BackupItemBlock, hookPods []itemblock.ItemBlockItem) {
log := itemBlock.Log log := itemBlock.Log
defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done() defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done()
@ -760,7 +815,7 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(itemBlock BackupItemBloc
} }
// wait all PVBs of the item block pods to be processed // wait all PVBs of the item block pods to be processed
func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error { func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock *BackupItemBlock, pods []itemblock.ItemBlockItem) error {
pvbMap := map[*velerov1api.PodVolumeBackup]bool{} pvbMap := map[*velerov1api.PodVolumeBackup]bool{}
for _, pod := range pods { for _, pod := range pods {
namespace, name := pod.Item.GetNamespace(), pod.Item.GetName() namespace, name := pod.Item.GetNamespace(), pod.Item.GetName()
@ -883,7 +938,7 @@ func (kb *kubernetesBackupper) backupCRD(log logrus.FieldLogger, gr schema.Group
kb.backupItem(log, gvr.GroupResource(), itemBackupper, unstructured, gvr, nil) kb.backupItem(log, gvr.GroupResource(), itemBackupper, unstructured, gvr, nil)
} }
func (kb *kubernetesBackupper) writeBackupVersion(tw *tar.Writer) error { func (kb *kubernetesBackupper) writeBackupVersion(tw tarWriter) error {
versionFile := filepath.Join(velerov1api.MetadataDir, "version") versionFile := filepath.Join(velerov1api.MetadataDir, "version")
versionString := fmt.Sprintf("%s\n", BackupFormatVersion) versionString := fmt.Sprintf("%s\n", BackupFormatVersion)
@ -914,7 +969,7 @@ func (kb *kubernetesBackupper) FinalizeBackup(
) error { ) error {
gzw := gzip.NewWriter(outBackupFile) gzw := gzip.NewWriter(outBackupFile)
defer gzw.Close() defer gzw.Close()
tw := tar.NewWriter(gzw) tw := NewTarWriter(tar.NewWriter(gzw))
defer tw.Close() defer tw.Close()
gzr, err := gzip.NewReader(inBackupFile) gzr, err := gzip.NewReader(inBackupFile)
@ -968,6 +1023,7 @@ func (kb *kubernetesBackupper) FinalizeBackup(
itemHookHandler: &hook.NoOpItemHookHandler{}, itemHookHandler: &hook.NoOpItemHookHandler{},
podVolumeSnapshotTracker: podvolume.NewTracker(), podVolumeSnapshotTracker: podvolume.NewTracker(),
hookTracker: hook.NewHookTracker(), hookTracker: hook.NewHookTracker(),
kubernetesBackupper: kb,
} }
updateFiles := make(map[string]FileForArchive) updateFiles := make(map[string]FileForArchive)
backedUpGroupResources := map[schema.GroupResource]bool{} backedUpGroupResources := map[schema.GroupResource]bool{}
@ -1053,7 +1109,9 @@ func (kb *kubernetesBackupper) FinalizeBackup(
return nil return nil
} }
func buildFinalTarball(tr *tar.Reader, tw *tar.Writer, updateFiles map[string]FileForArchive) error { func buildFinalTarball(tr *tar.Reader, tw tarWriter, updateFiles map[string]FileForArchive) error {
tw.Lock()
defer tw.Unlock()
for { for {
header, err := tr.Next() header, err := tr.Next()
if err == io.EOF { if err == io.EOF {
@ -1104,10 +1162,16 @@ func buildFinalTarball(tr *tar.Reader, tw *tar.Writer, updateFiles map[string]Fi
return nil return nil
} }
type tarWriter interface { type tarWriter struct {
io.Closer *tar.Writer
Write([]byte) (int, error) *sync.Mutex
WriteHeader(*tar.Header) error }
func NewTarWriter(writer *tar.Writer) tarWriter {
return tarWriter{
Writer: writer,
Mutex: &sync.Mutex{},
}
} }
// updateVolumeInfos update the VolumeInfos according to the AsyncOperations // updateVolumeInfos update the VolumeInfos according to the AsyncOperations

View File

@ -72,11 +72,14 @@ func TestBackedUpItemsMatchesTarballContents(t *testing.T) {
"v1/PersistentVolume": "persistentvolumes", "v1/PersistentVolume": "persistentvolumes",
} }
h := newHarness(t) h := newHarness(t, nil)
defer h.itemBlockPool.Stop()
req := &Request{ req := &Request{
Backup: defaultBackup().Result(), Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: h.itemBlockPool.GetInputChannel(),
} }
backupFile := bytes.NewBuffer([]byte{}) backupFile := bytes.NewBuffer([]byte{})
@ -132,11 +135,13 @@ func TestBackedUpItemsMatchesTarballContents(t *testing.T) {
// backed up. It validates this by comparing their values to the length of // backed up. It validates this by comparing their values to the length of
// the request's BackedUpItems field. // the request's BackedUpItems field.
func TestBackupProgressIsUpdated(t *testing.T) { func TestBackupProgressIsUpdated(t *testing.T) {
h := newHarness(t) h := newHarness(t, nil)
defer h.itemBlockPool.Stop()
req := &Request{ req := &Request{
Backup: defaultBackup().Result(), Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: h.itemBlockPool.GetInputChannel(),
} }
backupFile := bytes.NewBuffer([]byte{}) backupFile := bytes.NewBuffer([]byte{})
@ -866,14 +871,17 @@ func TestBackupOldResourceFiltering(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -1044,14 +1052,17 @@ func TestCRDInclusion(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -1140,14 +1151,17 @@ func TestBackupResourceCohabitation(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -1168,13 +1182,15 @@ func TestBackupResourceCohabitation(t *testing.T) {
// backed up in each backup. Verification is done by looking at the contents of the backup // backed up in each backup. Verification is done by looking at the contents of the backup
// tarball. This covers a specific issue that was fixed by https://github.com/vmware-tanzu/velero/pull/485. // tarball. This covers a specific issue that was fixed by https://github.com/vmware-tanzu/velero/pull/485.
func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) { func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
h := newHarness(t) h := newHarness(t, nil)
defer h.itemBlockPool.Stop()
// run and verify backup 1 // run and verify backup 1
backup1 := &Request{ backup1 := &Request{
Backup: defaultBackup().Result(), Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: h.itemBlockPool.GetInputChannel(),
} }
backup1File := bytes.NewBuffer([]byte{}) backup1File := bytes.NewBuffer([]byte{})
@ -1190,6 +1206,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
Backup: defaultBackup().Result(), Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: h.itemBlockPool.GetInputChannel(),
} }
backup2File := bytes.NewBuffer([]byte{}) backup2File := bytes.NewBuffer([]byte{})
@ -1233,14 +1250,17 @@ func TestBackupResourceOrdering(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -1341,6 +1361,9 @@ func (a *recordResourcesAction) WithSkippedCSISnapshotFlag(flag bool) *recordRes
// TestBackupItemActionsForSkippedPV runs backups with backup item actions, and // TestBackupItemActionsForSkippedPV runs backups with backup item actions, and
// verifies that the data in SkippedPVTracker is updated as expected. // verifies that the data in SkippedPVTracker is updated as expected.
func TestBackupItemActionsForSkippedPV(t *testing.T) { func TestBackupItemActionsForSkippedPV(t *testing.T) {
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
tests := []struct { tests := []struct {
name string name string
backupReq *Request backupReq *Request
@ -1358,6 +1381,7 @@ func TestBackupItemActionsForSkippedPV(t *testing.T) {
Backup: defaultBackup().SnapshotVolumes(false).Result(), Backup: defaultBackup().SnapshotVolumes(false).Result(),
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
resPolicies: &resourcepolicies.ResourcePolicies{ resPolicies: &resourcepolicies.ResourcePolicies{
Version: "v1", Version: "v1",
@ -1404,7 +1428,8 @@ func TestBackupItemActionsForSkippedPV(t *testing.T) {
}, },
includedPVs: map[string]struct{}{}, includedPVs: map[string]struct{}{},
}, },
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.PVCs( test.PVCs(
@ -1430,7 +1455,7 @@ func TestBackupItemActionsForSkippedPV(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(tt *testing.T) { t.Run(tc.name, func(tt *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
fakeClient = test.NewFakeControllerRuntimeClient(t, tc.runtimeResources...) fakeClient = test.NewFakeControllerRuntimeClient(t, tc.runtimeResources...)
) )
@ -1644,14 +1669,17 @@ func TestBackupActionsRunForCorrectItems(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -1726,14 +1754,17 @@ func TestBackupWithInvalidActions(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -1877,14 +1908,17 @@ func TestBackupActionModifications(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -2134,14 +2168,17 @@ func TestBackupActionAdditionalItems(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -2392,14 +2429,17 @@ func TestItemBlockActionsRunForCorrectItems(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -2474,14 +2514,17 @@ func TestBackupWithInvalidItemBlockActions(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -2727,14 +2770,17 @@ func TestItemBlockActionRelatedItems(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -2883,6 +2929,8 @@ func (*fakeVolumeSnapshotter) DeleteSnapshot(snapshotID string) error {
// struct in place of real volume snapshotters. // struct in place of real volume snapshotters.
func TestBackupWithSnapshots(t *testing.T) { func TestBackupWithSnapshots(t *testing.T) {
// TODO: add more verification for skippedPVTracker // TODO: add more verification for skippedPVTracker
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
tests := []struct { tests := []struct {
name string name string
req *Request req *Request
@ -2900,6 +2948,7 @@ func TestBackupWithSnapshots(t *testing.T) {
}, },
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.PVs( test.PVs(
@ -2935,6 +2984,7 @@ func TestBackupWithSnapshots(t *testing.T) {
}, },
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.PVs( test.PVs(
@ -2971,6 +3021,7 @@ func TestBackupWithSnapshots(t *testing.T) {
}, },
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.PVs( test.PVs(
@ -3007,6 +3058,7 @@ func TestBackupWithSnapshots(t *testing.T) {
}, },
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.PVs( test.PVs(
@ -3043,6 +3095,7 @@ func TestBackupWithSnapshots(t *testing.T) {
}, },
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.PVs( test.PVs(
@ -3077,6 +3130,7 @@ func TestBackupWithSnapshots(t *testing.T) {
}, },
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.PVs( test.PVs(
@ -3094,6 +3148,7 @@ func TestBackupWithSnapshots(t *testing.T) {
Backup: defaultBackup().Result(), Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.PVs( test.PVs(
@ -3114,6 +3169,7 @@ func TestBackupWithSnapshots(t *testing.T) {
}, },
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.PVs( test.PVs(
@ -3132,6 +3188,7 @@ func TestBackupWithSnapshots(t *testing.T) {
}, },
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.PVs( test.PVs(
@ -3153,6 +3210,7 @@ func TestBackupWithSnapshots(t *testing.T) {
}, },
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.PVs( test.PVs(
@ -3200,7 +3258,7 @@ func TestBackupWithSnapshots(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -3271,6 +3329,8 @@ func TestBackupWithAsyncOperations(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
tests := []struct { tests := []struct {
name string name string
req *Request req *Request
@ -3284,6 +3344,7 @@ func TestBackupWithAsyncOperations(t *testing.T) {
Backup: defaultBackup().Result(), Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.Pods( test.Pods(
@ -3315,6 +3376,7 @@ func TestBackupWithAsyncOperations(t *testing.T) {
Backup: defaultBackup().Result(), Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.Pods( test.Pods(
@ -3346,6 +3408,7 @@ func TestBackupWithAsyncOperations(t *testing.T) {
Backup: defaultBackup().Result(), Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
}, },
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.Pods( test.Pods(
@ -3362,7 +3425,7 @@ func TestBackupWithAsyncOperations(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -3421,14 +3484,17 @@ func TestBackupWithInvalidHooks(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -3892,14 +3958,17 @@ func TestBackupWithHooks(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
podCommandExecutor = new(test.MockPodCommandExecutor) podCommandExecutor = new(test.MockPodCommandExecutor)
@ -4123,15 +4192,18 @@ func TestBackupWithPodVolume(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SnapshotLocations: []*velerov1.VolumeSnapshotLocation{tc.vsl}, SnapshotLocations: []*velerov1.VolumeSnapshotLocation{tc.vsl},
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -4216,8 +4288,9 @@ func (a *pluggableIBA) Name() string {
type harness struct { type harness struct {
*test.APIServer *test.APIServer
backupper *kubernetesBackupper backupper *kubernetesBackupper
log logrus.FieldLogger log logrus.FieldLogger
itemBlockPool ItemBlockWorkerPool
} }
func (h *harness) addItems(t *testing.T, resource *test.APIResource) { func (h *harness) addItems(t *testing.T, resource *test.APIResource) {
@ -4241,7 +4314,7 @@ func (h *harness) addItems(t *testing.T, resource *test.APIResource) {
} }
} }
func newHarness(t *testing.T) *harness { func newHarness(t *testing.T, itemBlockPool *ItemBlockWorkerPool) *harness {
t.Helper() t.Helper()
apiServer := test.NewAPIServer(t) apiServer := test.NewAPIServer(t)
@ -4250,6 +4323,9 @@ func newHarness(t *testing.T) *harness {
discoveryHelper, err := discovery.NewHelper(apiServer.DiscoveryClient, log) discoveryHelper, err := discovery.NewHelper(apiServer.DiscoveryClient, log)
require.NoError(t, err) require.NoError(t, err)
if itemBlockPool == nil {
itemBlockPool = StartItemBlockWorkerPool(context.Background(), 1, log)
}
return &harness{ return &harness{
APIServer: apiServer, APIServer: apiServer,
backupper: &kubernetesBackupper{ backupper: &kubernetesBackupper{
@ -4262,7 +4338,8 @@ func newHarness(t *testing.T) *harness {
podVolumeBackupperFactory: new(fakePodVolumeBackupperFactory), podVolumeBackupperFactory: new(fakePodVolumeBackupperFactory),
podVolumeTimeout: 60 * time.Second, podVolumeTimeout: 60 * time.Second,
}, },
log: log, log: log,
itemBlockPool: *itemBlockPool,
} }
} }
@ -5235,14 +5312,17 @@ func TestBackupNewResourceFiltering(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )
@ -5397,14 +5477,17 @@ func TestBackupNamespaces(t *testing.T) {
}, },
} }
itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger())
defer itemBlockPool.Stop()
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var ( var (
h = newHarness(t) h = newHarness(t, itemBlockPool)
req = &Request{ req = &Request{
Backup: tc.backup, Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(), SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(), BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
} }
backupFile = bytes.NewBuffer([]byte{}) backupFile = bytes.NewBuffer([]byte{})
) )

View File

@ -71,6 +71,7 @@ type itemBackupper struct {
podVolumeBackupper podvolume.Backupper podVolumeBackupper podvolume.Backupper
podVolumeSnapshotTracker *podvolume.Tracker podVolumeSnapshotTracker *podvolume.Tracker
volumeSnapshotterGetter VolumeSnapshotterGetter volumeSnapshotterGetter VolumeSnapshotterGetter
kubernetesBackupper *kubernetesBackupper
itemHookHandler hook.ItemHookHandler itemHookHandler hook.ItemHookHandler
snapshotLocationVolumeSnapshotters map[string]vsv1.VolumeSnapshotter snapshotLocationVolumeSnapshotters map[string]vsv1.VolumeSnapshotter
@ -95,6 +96,8 @@ func (ib *itemBackupper) backupItem(logger logrus.FieldLogger, obj runtime.Unstr
if !selectedForBackup || err != nil || len(files) == 0 || finalize { if !selectedForBackup || err != nil || len(files) == 0 || finalize {
return selectedForBackup, files, err return selectedForBackup, files, err
} }
ib.tarWriter.Lock()
defer ib.tarWriter.Unlock()
for _, file := range files { for _, file := range files {
if err := ib.tarWriter.WriteHeader(file.Header); err != nil { if err := ib.tarWriter.WriteHeader(file.Header); err != nil {
return false, []FileForArchive{}, errors.WithStack(err) return false, []FileForArchive{}, errors.WithStack(err)

View File

@ -0,0 +1,99 @@
/*
Copyright the Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package backup
import (
"context"
"sync"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/runtime/schema"
)
type ItemBlockWorkerPool struct {
inputChannel chan ItemBlockInput
wg *sync.WaitGroup
logger logrus.FieldLogger
cancelFunc context.CancelFunc
}
type ItemBlockInput struct {
itemBlock *BackupItemBlock
returnChan chan ItemBlockReturn
}
type ItemBlockReturn struct {
itemBlock *BackupItemBlock
resources []schema.GroupResource
err error
}
func (p *ItemBlockWorkerPool) GetInputChannel() chan ItemBlockInput {
return p.inputChannel
}
func StartItemBlockWorkerPool(ctx context.Context, workers int, log logrus.FieldLogger) *ItemBlockWorkerPool {
// Buffer will hold up to 10 ItemBlocks waiting for processing
inputChannel := make(chan ItemBlockInput, max(workers, 10))
ctx, cancelFunc := context.WithCancel(ctx)
wg := &sync.WaitGroup{}
for i := 0; i < workers; i++ {
logger := log.WithField("worker", i)
wg.Add(1)
go processItemBlockWorker(ctx, inputChannel, logger, wg)
}
pool := &ItemBlockWorkerPool{
inputChannel: inputChannel,
cancelFunc: cancelFunc,
logger: log,
wg: wg,
}
return pool
}
func (p *ItemBlockWorkerPool) Stop() {
p.cancelFunc()
p.logger.Info("ItemBlock worker stopping")
p.wg.Wait()
p.logger.Info("ItemBlock worker stopped")
}
func processItemBlockWorker(ctx context.Context,
inputChannel chan ItemBlockInput,
logger logrus.FieldLogger,
wg *sync.WaitGroup) {
for {
select {
case m := <-inputChannel:
logger.Infof("processing ItemBlock for backup %v", m.itemBlock.itemBackupper.backupRequest.Name)
grList := m.itemBlock.itemBackupper.kubernetesBackupper.backupItemBlock(m.itemBlock)
logger.Infof("finished processing ItemBlock for backup %v", m.itemBlock.itemBackupper.backupRequest.Name)
m.returnChan <- ItemBlockReturn{
itemBlock: m.itemBlock,
resources: grList,
err: nil,
}
case <-ctx.Done():
logger.Info("stopping ItemBlock worker")
wg.Done()
return
}
}
}

View File

@ -179,6 +179,8 @@ type kubernetesResource struct {
// set to true during backup processing when added to an ItemBlock // set to true during backup processing when added to an ItemBlock
// or if the item is excluded from backup. // or if the item is excluded from backup.
inItemBlockOrExcluded bool inItemBlockOrExcluded bool
// Kind is added to facilitate creating an itemKey for progress tracking
kind string
} }
// getItemsFromResourceIdentifiers get the kubernetesResources // getItemsFromResourceIdentifiers get the kubernetesResources
@ -407,6 +409,7 @@ func (r *itemCollector) getResourceItems(
namespace: resourceID.Namespace, namespace: resourceID.Namespace,
name: resourceID.Name, name: resourceID.Name,
path: path, path: path,
kind: resource.Kind,
}) })
} }
@ -480,6 +483,7 @@ func (r *itemCollector) getResourceItems(
namespace: item.GetNamespace(), namespace: item.GetNamespace(),
name: item.GetName(), name: item.GetName(),
path: path, path: path,
kind: resource.Kind,
}) })
if item.GetNamespace() != "" { if item.GetNamespace() != "" {
@ -806,6 +810,7 @@ func (r *itemCollector) collectNamespaces(
preferredGVR: preferredGVR, preferredGVR: preferredGVR,
name: unstructuredList.Items[index].GetName(), name: unstructuredList.Items[index].GetName(),
path: path, path: path,
kind: resource.Kind,
}) })
} }

View File

@ -51,6 +51,7 @@ type Request struct {
ResPolicies *resourcepolicies.Policies ResPolicies *resourcepolicies.Policies
SkippedPVTracker *skipPVTracker SkippedPVTracker *skipPVTracker
VolumesInformation volume.BackupVolumesInformation VolumesInformation volume.BackupVolumesInformation
ItemBlockChannel chan ItemBlockInput
} }
// BackupVolumesInformation contains the information needs by generating // BackupVolumesInformation contains the information needs by generating

View File

@ -87,6 +87,7 @@ type backupReconciler struct {
defaultSnapshotMoveData bool defaultSnapshotMoveData bool
globalCRClient kbclient.Client globalCRClient kbclient.Client
itemBlockWorkerCount int itemBlockWorkerCount int
workerPool *pkgbackup.ItemBlockWorkerPool
} }
func NewBackupReconciler( func NewBackupReconciler(
@ -139,6 +140,7 @@ func NewBackupReconciler(
defaultSnapshotMoveData: defaultSnapshotMoveData, defaultSnapshotMoveData: defaultSnapshotMoveData,
itemBlockWorkerCount: itemBlockWorkerCount, itemBlockWorkerCount: itemBlockWorkerCount,
globalCRClient: globalCRClient, globalCRClient: globalCRClient,
workerPool: pkgbackup.StartItemBlockWorkerPool(ctx, itemBlockWorkerCount, logger),
} }
b.updateTotalBackupMetric() b.updateTotalBackupMetric()
return b return b
@ -329,6 +331,7 @@ func (b *backupReconciler) prepareBackupRequest(backup *velerov1api.Backup, logg
Backup: backup.DeepCopy(), // don't modify items in the cache Backup: backup.DeepCopy(), // don't modify items in the cache
SkippedPVTracker: pkgbackup.NewSkipPVTracker(), SkippedPVTracker: pkgbackup.NewSkipPVTracker(),
BackedUpItems: pkgbackup.NewBackedUpItemsMap(), BackedUpItems: pkgbackup.NewBackedUpItemsMap(),
ItemBlockChannel: b.workerPool.GetInputChannel(),
} }
request.VolumesInformation.Init() request.VolumesInformation.Init()

View File

@ -137,7 +137,9 @@ func TestProcessBackupNonProcessedItems(t *testing.T) {
kbClient: velerotest.NewFakeControllerRuntimeClient(t), kbClient: velerotest.NewFakeControllerRuntimeClient(t),
formatFlag: formatFlag, formatFlag: formatFlag,
logger: logger, logger: logger,
workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger),
} }
defer c.workerPool.Stop()
if test.backup != nil { if test.backup != nil {
require.NoError(t, c.kbClient.Create(context.Background(), test.backup)) require.NoError(t, c.kbClient.Create(context.Background(), test.backup))
} }
@ -226,7 +228,9 @@ func TestProcessBackupValidationFailures(t *testing.T) {
clock: &clock.RealClock{}, clock: &clock.RealClock{},
formatFlag: formatFlag, formatFlag: formatFlag,
metrics: metrics.NewServerMetrics(), metrics: metrics.NewServerMetrics(),
workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger),
} }
defer c.workerPool.Stop()
require.NotNil(t, test.backup) require.NotNil(t, test.backup)
require.NoError(t, c.kbClient.Create(context.Background(), test.backup)) require.NoError(t, c.kbClient.Create(context.Background(), test.backup))
@ -289,7 +293,9 @@ func TestBackupLocationLabel(t *testing.T) {
defaultBackupLocation: test.backupLocation.Name, defaultBackupLocation: test.backupLocation.Name,
clock: &clock.RealClock{}, clock: &clock.RealClock{},
formatFlag: formatFlag, formatFlag: formatFlag,
workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger),
} }
defer c.workerPool.Stop()
res := c.prepareBackupRequest(test.backup, logger) res := c.prepareBackupRequest(test.backup, logger)
assert.NotNil(t, res) assert.NotNil(t, res)
@ -384,7 +390,9 @@ func Test_prepareBackupRequest_BackupStorageLocation(t *testing.T) {
defaultBackupTTL: defaultBackupTTL.Duration, defaultBackupTTL: defaultBackupTTL.Duration,
clock: testclocks.NewFakeClock(now), clock: testclocks.NewFakeClock(now),
formatFlag: formatFlag, formatFlag: formatFlag,
workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger),
} }
defer c.workerPool.Stop()
test.backup.Spec.StorageLocation = test.backupLocationNameInBackup test.backup.Spec.StorageLocation = test.backupLocationNameInBackup
@ -460,7 +468,9 @@ func TestDefaultBackupTTL(t *testing.T) {
defaultBackupTTL: defaultBackupTTL.Duration, defaultBackupTTL: defaultBackupTTL.Duration,
clock: testclocks.NewFakeClock(now), clock: testclocks.NewFakeClock(now),
formatFlag: formatFlag, formatFlag: formatFlag,
workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger),
} }
defer c.workerPool.Stop()
res := c.prepareBackupRequest(test.backup, logger) res := c.prepareBackupRequest(test.backup, logger)
assert.NotNil(t, res) assert.NotNil(t, res)
@ -560,7 +570,9 @@ func TestDefaultVolumesToResticDeprecation(t *testing.T) {
clock: &clock.RealClock{}, clock: &clock.RealClock{},
formatFlag: formatFlag, formatFlag: formatFlag,
defaultVolumesToFsBackup: test.globalVal, defaultVolumesToFsBackup: test.globalVal,
workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger),
} }
defer c.workerPool.Stop()
res := c.prepareBackupRequest(test.backup, logger) res := c.prepareBackupRequest(test.backup, logger)
assert.NotNil(t, res) assert.NotNil(t, res)
@ -1345,7 +1357,9 @@ func TestProcessBackupCompletions(t *testing.T) {
backupper: backupper, backupper: backupper,
formatFlag: formatFlag, formatFlag: formatFlag,
globalCRClient: fakeGlobalClient, globalCRClient: fakeGlobalClient,
workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger),
} }
defer c.workerPool.Stop()
pluginManager.On("GetBackupItemActionsV2").Return(nil, nil) pluginManager.On("GetBackupItemActionsV2").Return(nil, nil)
pluginManager.On("GetItemBlockActions").Return(nil, nil) pluginManager.On("GetItemBlockActions").Return(nil, nil)
@ -1539,7 +1553,9 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) {
logger: logger, logger: logger,
defaultSnapshotLocations: test.defaultLocations, defaultSnapshotLocations: test.defaultLocations,
kbClient: velerotest.NewFakeControllerRuntimeClient(t), kbClient: velerotest.NewFakeControllerRuntimeClient(t),
workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger),
} }
defer c.workerPool.Stop()
// set up a Backup object to represent what we expect to be passed to backupper.Backup() // set up a Backup object to represent what we expect to be passed to backupper.Backup()
backup := test.backup.DeepCopy() backup := test.backup.DeepCopy()

View File

@ -18,6 +18,7 @@ package podvolume
import ( import (
"fmt" "fmt"
"sync"
corev1api "k8s.io/api/core/v1" corev1api "k8s.io/api/core/v1"
) )
@ -27,6 +28,7 @@ import (
type Tracker struct { type Tracker struct {
pvcs map[string]pvcSnapshotStatus pvcs map[string]pvcSnapshotStatus
pvcPod map[string]string pvcPod map[string]string
*sync.RWMutex
} }
type pvcSnapshotStatus int type pvcSnapshotStatus int
@ -42,7 +44,8 @@ func NewTracker() *Tracker {
return &Tracker{ return &Tracker{
pvcs: make(map[string]pvcSnapshotStatus), pvcs: make(map[string]pvcSnapshotStatus),
// key: pvc ns/name, value: pod name // key: pvc ns/name, value: pod name
pvcPod: make(map[string]string), pvcPod: make(map[string]string),
RWMutex: &sync.RWMutex{},
} }
} }
@ -64,6 +67,8 @@ func (t *Tracker) Optout(pod *corev1api.Pod, volumeName string) {
// OptedoutByPod returns true if the PVC with the specified namespace and name has been opted out by the pod. The // OptedoutByPod returns true if the PVC with the specified namespace and name has been opted out by the pod. The
// second return value is the name of the pod which has the annotation that opted out the volume/pvc // second return value is the name of the pod which has the annotation that opted out the volume/pvc
func (t *Tracker) OptedoutByPod(namespace, name string) (bool, string) { func (t *Tracker) OptedoutByPod(namespace, name string) (bool, string) {
t.RLock()
defer t.RUnlock()
status, found := t.pvcs[key(namespace, name)] status, found := t.pvcs[key(namespace, name)]
if !found || status != pvcSnapshotStatusOptedout { if !found || status != pvcSnapshotStatusOptedout {
@ -74,6 +79,8 @@ func (t *Tracker) OptedoutByPod(namespace, name string) (bool, string) {
// if the volume is a PVC, record the status and the related pod // if the volume is a PVC, record the status and the related pod
func (t *Tracker) recordStatus(pod *corev1api.Pod, volumeName string, status pvcSnapshotStatus, preReqStatus pvcSnapshotStatus) { func (t *Tracker) recordStatus(pod *corev1api.Pod, volumeName string, status pvcSnapshotStatus, preReqStatus pvcSnapshotStatus) {
t.Lock()
defer t.Unlock()
for _, volume := range pod.Spec.Volumes { for _, volume := range pod.Spec.Volumes {
if volume.Name == volumeName { if volume.Name == volumeName {
if volume.PersistentVolumeClaim != nil { if volume.PersistentVolumeClaim != nil {
@ -93,6 +100,8 @@ func (t *Tracker) recordStatus(pod *corev1api.Pod, volumeName string, status pvc
// Has returns true if the PVC with the specified namespace and name has been tracked. // Has returns true if the PVC with the specified namespace and name has been tracked.
func (t *Tracker) Has(namespace, name string) bool { func (t *Tracker) Has(namespace, name string) bool {
t.RLock()
defer t.RUnlock()
status, found := t.pvcs[key(namespace, name)] status, found := t.pvcs[key(namespace, name)]
return found && (status == pvcSnapshotStatusTracked || status == pvcSnapshotStatusTaken) return found && (status == pvcSnapshotStatusTracked || status == pvcSnapshotStatusTaken)
} }
@ -100,6 +109,8 @@ func (t *Tracker) Has(namespace, name string) bool {
// TakenForPodVolume returns true and the PVC's name if the pod volume with the specified name uses a // TakenForPodVolume returns true and the PVC's name if the pod volume with the specified name uses a
// PVC and that PVC has been taken by pod volume backup. // PVC and that PVC has been taken by pod volume backup.
func (t *Tracker) TakenForPodVolume(pod *corev1api.Pod, volume string) (bool, string) { func (t *Tracker) TakenForPodVolume(pod *corev1api.Pod, volume string) (bool, string) {
t.RLock()
defer t.RUnlock()
for _, podVolume := range pod.Spec.Volumes { for _, podVolume := range pod.Spec.Volumes {
if podVolume.Name != volume { if podVolume.Name != volume {
continue continue