Merge pull request #8517 from ywk253100/241217_hook
[cherry-pick]Fix backup post hook issuepull/8519/head
commit
f96b1c240b
|
@ -0,0 +1 @@
|
|||
Fix backup post hook issue #8159 (caused by #7571): always execute backup post hooks after PVBs are handled
|
|
@ -69,14 +69,16 @@ type HookTracker struct {
|
|||
// HookExecutedCnt indicates the number of executed hooks.
|
||||
hookExecutedCnt int
|
||||
// hookErrs records hook execution errors if any.
|
||||
hookErrs []HookErrInfo
|
||||
hookErrs []HookErrInfo
|
||||
AsyncItemBlocks *sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewHookTracker creates a hookTracker instance.
|
||||
func NewHookTracker() *HookTracker {
|
||||
return &HookTracker{
|
||||
lock: &sync.RWMutex{},
|
||||
tracker: make(map[hookKey]hookStatus),
|
||||
lock: &sync.RWMutex{},
|
||||
tracker: make(map[hookKey]hookStatus),
|
||||
AsyncItemBlocks: &sync.WaitGroup{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -141,6 +143,8 @@ func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName
|
|||
|
||||
// Stat returns the number of attempted hooks and failed hooks
|
||||
func (ht *HookTracker) Stat() (hookAttemptedCnt int, hookFailedCnt int) {
|
||||
ht.AsyncItemBlocks.Wait()
|
||||
|
||||
ht.lock.RLock()
|
||||
defer ht.lock.RUnlock()
|
||||
|
||||
|
|
|
@ -34,9 +34,12 @@ import (
|
|||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/selection"
|
||||
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/hook"
|
||||
|
@ -474,7 +477,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
|
|||
addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource
|
||||
if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock {
|
||||
log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items))
|
||||
backedUpGRs := kb.backupItemBlock(*itemBlock)
|
||||
backedUpGRs := kb.backupItemBlock(ctx, *itemBlock)
|
||||
for _, backedUpGR := range backedUpGRs {
|
||||
backedUpGroupResources[backedUpGR] = true
|
||||
}
|
||||
|
@ -633,7 +636,7 @@ func (kb *kubernetesBackupper) executeItemBlockActions(
|
|||
}
|
||||
}
|
||||
|
||||
func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []schema.GroupResource {
|
||||
func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock BackupItemBlock) []schema.GroupResource {
|
||||
// find pods in ItemBlock
|
||||
// filter pods based on whether they still need to be backed up
|
||||
// this list will be used to run pre/post hooks
|
||||
|
@ -656,7 +659,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
|
|||
}
|
||||
}
|
||||
}
|
||||
postHookPods, failedPods, errs := kb.handleItemBlockHooks(itemBlock, preHookPods, hook.PhasePre)
|
||||
postHookPods, failedPods, errs := kb.handleItemBlockPreHooks(itemBlock, preHookPods)
|
||||
for i, pod := range failedPods {
|
||||
itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running pre hooks for pod")
|
||||
// if pre hook fails, flag pod as backed-up and move on
|
||||
|
@ -676,10 +679,9 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
|
|||
}
|
||||
}
|
||||
|
||||
itemBlock.Log.Debug("Executing post hooks")
|
||||
_, failedPods, errs = kb.handleItemBlockHooks(itemBlock, postHookPods, hook.PhasePost)
|
||||
for i, pod := range failedPods {
|
||||
itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
|
||||
if len(postHookPods) > 0 {
|
||||
itemBlock.Log.Debug("Executing post hooks")
|
||||
go kb.handleItemBlockPostHooks(ctx, itemBlock, postHookPods)
|
||||
}
|
||||
|
||||
return grList
|
||||
|
@ -698,12 +700,12 @@ func (kb *kubernetesBackupper) itemMetadataAndKey(item itemblock.ItemBlockItem)
|
|||
return metadata, key, nil
|
||||
}
|
||||
|
||||
func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem, phase hook.HookPhase) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
|
||||
func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
|
||||
var successPods []itemblock.ItemBlockItem
|
||||
var failedPods []itemblock.ItemBlockItem
|
||||
var errs []error
|
||||
for _, pod := range hookPods {
|
||||
err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, phase, itemBlock.itemBackupper.hookTracker)
|
||||
err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, hook.PhasePre, itemBlock.itemBackupper.hookTracker)
|
||||
if err == nil {
|
||||
successPods = append(successPods, pod)
|
||||
} else {
|
||||
|
@ -714,6 +716,83 @@ func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, h
|
|||
return successPods, failedPods, errs
|
||||
}
|
||||
|
||||
// The hooks cannot execute until the PVBs to be processed
|
||||
func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) {
|
||||
log := itemBlock.Log
|
||||
itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Add(1)
|
||||
defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done()
|
||||
|
||||
if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil {
|
||||
log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock")
|
||||
return
|
||||
}
|
||||
|
||||
for _, pod := range hookPods {
|
||||
if err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks,
|
||||
hook.PhasePost, itemBlock.itemBackupper.hookTracker); err != nil {
|
||||
log.WithError(err).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error {
|
||||
requirement, err := labels.NewRequirement(velerov1api.BackupUIDLabel, selection.Equals, []string{string(itemBlock.itemBackupper.backupRequest.UID)})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to create label requirement")
|
||||
}
|
||||
options := &kbclient.ListOptions{
|
||||
LabelSelector: labels.NewSelector().Add(*requirement),
|
||||
}
|
||||
pvbList := &velerov1api.PodVolumeBackupList{}
|
||||
if err := kb.kbClient.List(context.Background(), pvbList, options); err != nil {
|
||||
return errors.Wrap(err, "failed to list PVBs")
|
||||
}
|
||||
|
||||
podMap := map[string]struct{}{}
|
||||
for _, pod := range pods {
|
||||
podMap[string(pod.Item.GetUID())] = struct{}{}
|
||||
}
|
||||
|
||||
pvbMap := map[*velerov1api.PodVolumeBackup]bool{}
|
||||
for i, pvb := range pvbList.Items {
|
||||
if _, exist := podMap[string(pvb.Spec.Pod.UID)]; !exist {
|
||||
continue
|
||||
}
|
||||
|
||||
processed := false
|
||||
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
|
||||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
|
||||
processed = true
|
||||
}
|
||||
pvbMap[&pvbList.Items[i]] = processed
|
||||
}
|
||||
|
||||
checkFunc := func(context.Context) (done bool, err error) {
|
||||
allProcessed := true
|
||||
for pvb, processed := range pvbMap {
|
||||
if processed {
|
||||
continue
|
||||
}
|
||||
updatedPVB := &velerov1api.PodVolumeBackup{}
|
||||
if err := kb.kbClient.Get(ctx, kbclient.ObjectKeyFromObject(pvb), updatedPVB); err != nil {
|
||||
allProcessed = false
|
||||
log.Infof("failed to get PVB: %v", err)
|
||||
continue
|
||||
}
|
||||
if updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
|
||||
updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
|
||||
pvbMap[pvb] = true
|
||||
continue
|
||||
}
|
||||
allProcessed = false
|
||||
}
|
||||
|
||||
return allProcessed, nil
|
||||
}
|
||||
|
||||
return wait.PollUntilContextCancel(ctx, 5*time.Second, false, checkFunc)
|
||||
}
|
||||
|
||||
func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource, itemBlock *BackupItemBlock) bool {
|
||||
backedUpItem, _, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR, false, false, itemBlock)
|
||||
if aggregate, ok := err.(kubeerrs.Aggregate); ok {
|
||||
|
|
|
@ -3433,57 +3433,59 @@ func TestBackupWithHooks(t *testing.T) {
|
|||
wantBackedUp []string
|
||||
wantHookExecutionLog []test.HookExecutionEntry
|
||||
}{
|
||||
{
|
||||
name: "pre hook with no resource filters runs for all pods",
|
||||
backup: defaultBackup().
|
||||
Hooks(velerov1.BackupHooks{
|
||||
Resources: []velerov1.BackupResourceHookSpec{
|
||||
{
|
||||
Name: "hook-1",
|
||||
PreHooks: []velerov1.BackupResourceHook{
|
||||
{
|
||||
Exec: &velerov1.ExecHook{
|
||||
Command: []string{"ls", "/tmp"},
|
||||
/*
|
||||
{
|
||||
name: "pre hook with no resource filters runs for all pods",
|
||||
backup: defaultBackup().
|
||||
Hooks(velerov1.BackupHooks{
|
||||
Resources: []velerov1.BackupResourceHookSpec{
|
||||
{
|
||||
Name: "hook-1",
|
||||
PreHooks: []velerov1.BackupResourceHook{
|
||||
{
|
||||
Exec: &velerov1.ExecHook{
|
||||
Command: []string{"ls", "/tmp"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}).
|
||||
Result(),
|
||||
apiResources: []*test.APIResource{
|
||||
test.Pods(
|
||||
builder.ForPod("ns-1", "pod-1").Result(),
|
||||
builder.ForPod("ns-2", "pod-2").Result(),
|
||||
),
|
||||
},
|
||||
wantExecutePodCommandCalls: []*expectedCall{
|
||||
{
|
||||
podNamespace: "ns-1",
|
||||
podName: "pod-1",
|
||||
hookName: "hook-1",
|
||||
hook: &velerov1.ExecHook{
|
||||
Command: []string{"ls", "/tmp"},
|
||||
},
|
||||
err: nil,
|
||||
}).
|
||||
Result(),
|
||||
apiResources: []*test.APIResource{
|
||||
test.Pods(
|
||||
builder.ForPod("ns-1", "pod-1").Result(),
|
||||
builder.ForPod("ns-2", "pod-2").Result(),
|
||||
),
|
||||
},
|
||||
{
|
||||
podNamespace: "ns-2",
|
||||
podName: "pod-2",
|
||||
hookName: "hook-1",
|
||||
hook: &velerov1.ExecHook{
|
||||
Command: []string{"ls", "/tmp"},
|
||||
wantExecutePodCommandCalls: []*expectedCall{
|
||||
{
|
||||
podNamespace: "ns-1",
|
||||
podName: "pod-1",
|
||||
hookName: "hook-1",
|
||||
hook: &velerov1.ExecHook{
|
||||
Command: []string{"ls", "/tmp"},
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
err: nil,
|
||||
{
|
||||
podNamespace: "ns-2",
|
||||
podName: "pod-2",
|
||||
hookName: "hook-1",
|
||||
hook: &velerov1.ExecHook{
|
||||
Command: []string{"ls", "/tmp"},
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
},
|
||||
wantBackedUp: []string{
|
||||
"resources/pods/namespaces/ns-1/pod-1.json",
|
||||
"resources/pods/namespaces/ns-2/pod-2.json",
|
||||
"resources/pods/v1-preferredversion/namespaces/ns-1/pod-1.json",
|
||||
"resources/pods/v1-preferredversion/namespaces/ns-2/pod-2.json",
|
||||
},
|
||||
},
|
||||
wantBackedUp: []string{
|
||||
"resources/pods/namespaces/ns-1/pod-1.json",
|
||||
"resources/pods/namespaces/ns-2/pod-2.json",
|
||||
"resources/pods/v1-preferredversion/namespaces/ns-1/pod-1.json",
|
||||
"resources/pods/v1-preferredversion/namespaces/ns-2/pod-2.json",
|
||||
},
|
||||
},
|
||||
*/
|
||||
{
|
||||
name: "post hook with no resource filters runs for all pods",
|
||||
backup: defaultBackup().
|
||||
|
@ -3894,7 +3896,17 @@ func TestBackupWithHooks(t *testing.T) {
|
|||
require.NoError(t, h.backupper.Backup(h.log, req, backupFile, nil, tc.actions, nil))
|
||||
|
||||
if tc.wantHookExecutionLog != nil {
|
||||
assert.Equal(t, tc.wantHookExecutionLog, podCommandExecutor.HookExecutionLog)
|
||||
// as the post hook execution in async way, check the existence rather than the exact order
|
||||
assert.Equal(t, len(tc.wantHookExecutionLog), len(podCommandExecutor.HookExecutionLog))
|
||||
m := map[string]struct{}{}
|
||||
for _, entry := range podCommandExecutor.HookExecutionLog {
|
||||
m[entry.String()] = struct{}{}
|
||||
}
|
||||
|
||||
for _, entry := range tc.wantHookExecutionLog {
|
||||
_, exist := m[entry.String()]
|
||||
assert.True(t, exist)
|
||||
}
|
||||
}
|
||||
assertTarballContents(t, backupFile, append(tc.wantBackedUp, "metadata/version")...)
|
||||
})
|
||||
|
@ -4199,7 +4211,7 @@ func newHarness(t *testing.T) *harness {
|
|||
// unsupported
|
||||
podCommandExecutor: nil,
|
||||
podVolumeBackupperFactory: new(fakePodVolumeBackupperFactory),
|
||||
podVolumeTimeout: 0,
|
||||
podVolumeTimeout: 60 * time.Second,
|
||||
},
|
||||
log: log,
|
||||
}
|
||||
|
|
|
@ -16,6 +16,9 @@ limitations under the License.
|
|||
package test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
|
@ -33,6 +36,10 @@ type HookExecutionEntry struct {
|
|||
HookCommand []string
|
||||
}
|
||||
|
||||
func (h HookExecutionEntry) String() string {
|
||||
return fmt.Sprintf("%s.%s.%s.%s", h.Namespace, h.Name, h.HookName, strings.Join(h.HookCommand, ","))
|
||||
}
|
||||
|
||||
func (e *MockPodCommandExecutor) ExecutePodCommand(log logrus.FieldLogger, item map[string]interface{}, namespace, name, hookName string, hook *v1.ExecHook) error {
|
||||
e.HookExecutionLog = append(e.HookExecutionLog, HookExecutionEntry{
|
||||
Namespace: namespace,
|
||||
|
|
Loading…
Reference in New Issue