Merge pull request #7611 from qiuming-best/datamover-cancel
Fix cancel bug && adjust StartTimestamp for data moverpull/7256/merge
commit
c2d267d894
|
@ -271,6 +271,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
|||
// Update status to InProgress
|
||||
original := dd.DeepCopy()
|
||||
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
|
||||
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
|
||||
log.WithError(err).Error("Unable to update status to in progress")
|
||||
return ctrl.Result{}, err
|
||||
|
@ -290,7 +291,11 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
|||
log.Info("Data download is being canceled")
|
||||
fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name)
|
||||
if fsRestore == nil {
|
||||
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
|
||||
if r.nodeName == dd.Status.Node {
|
||||
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
|
||||
} else {
|
||||
log.Info("Data path is not started in this node and will not canceled by current node")
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
|
@ -668,7 +673,6 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel
|
|||
|
||||
updateFunc := func(datadownload *velerov2alpha1api.DataDownload) {
|
||||
datadownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
|
||||
datadownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
labels := datadownload.GetLabels()
|
||||
if labels == nil {
|
||||
labels = make(map[string]string)
|
||||
|
|
|
@ -149,7 +149,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...
|
|||
|
||||
dataPathMgr := datapath.NewManager(1)
|
||||
|
||||
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
}
|
||||
|
||||
func TestDataDownloadReconcile(t *testing.T) {
|
||||
|
@ -207,11 +207,40 @@ func TestDataDownloadReconcile(t *testing.T) {
|
|||
mockCancel: true,
|
||||
},
|
||||
{
|
||||
name: "Cancel data downloand in progress",
|
||||
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result(),
|
||||
name: "Cancel data downloand in progress with create FSBR",
|
||||
dd: func() *velerov2alpha1api.DataDownload {
|
||||
dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result()
|
||||
dd.Status.Node = "test-node"
|
||||
return dd
|
||||
}(),
|
||||
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
|
||||
needCreateFSBR: true,
|
||||
mockCancel: true,
|
||||
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceling).Result(),
|
||||
},
|
||||
{
|
||||
name: "Cancel data downloand in progress without create FSBR",
|
||||
dd: func() *velerov2alpha1api.DataDownload {
|
||||
dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result()
|
||||
dd.Status.Node = "test-node"
|
||||
return dd
|
||||
}(),
|
||||
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
|
||||
needCreateFSBR: false,
|
||||
mockCancel: true,
|
||||
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(),
|
||||
},
|
||||
{
|
||||
name: "Cancel data downloand in progress in different node",
|
||||
dd: func() *velerov2alpha1api.DataDownload {
|
||||
dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Cancel(true).Result()
|
||||
dd.Status.Node = "different-node"
|
||||
return dd
|
||||
}(),
|
||||
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
|
||||
needCreateFSBR: false,
|
||||
mockCancel: true,
|
||||
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result(),
|
||||
},
|
||||
{
|
||||
name: "Error in data path is concurrent limited",
|
||||
|
|
|
@ -282,6 +282,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
|||
// Update status to InProgress
|
||||
original := du.DeepCopy()
|
||||
du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress
|
||||
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil {
|
||||
return r.errorOut(ctx, du, err, "error updating dataupload status", log)
|
||||
}
|
||||
|
@ -300,7 +301,11 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
|||
|
||||
fsBackup := r.dataPathMgr.GetAsyncBR(du.Name)
|
||||
if fsBackup == nil {
|
||||
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
|
||||
if du.Status.Node == r.nodeName {
|
||||
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
|
||||
} else {
|
||||
log.Info("Data path is not started in this node and will not canceled by current node")
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
|
@ -720,7 +725,6 @@ func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov
|
|||
|
||||
updateFunc := func(dataUpload *velerov2alpha1api.DataUpload) {
|
||||
dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted
|
||||
dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
labels := dataUpload.GetLabels()
|
||||
if labels == nil {
|
||||
labels = make(map[string]string)
|
||||
|
|
|
@ -233,7 +233,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
|
|||
return nil, err
|
||||
}
|
||||
return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, nil,
|
||||
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
}
|
||||
|
||||
func dataUploadBuilder() *builder.DataUploadBuilder {
|
||||
|
@ -336,6 +336,7 @@ func TestReconcile(t *testing.T) {
|
|||
expectedErrMsg string
|
||||
needErrs []bool
|
||||
peekErr error
|
||||
notCreateFSBR bool
|
||||
}{
|
||||
{
|
||||
name: "Dataupload is not initialized",
|
||||
|
@ -412,6 +413,32 @@ func TestReconcile(t *testing.T) {
|
|||
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(),
|
||||
expectedRequeue: ctrl.Result{},
|
||||
},
|
||||
{
|
||||
name: "Dataupload should be cancel with match node",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
|
||||
du: func() *velerov2alpha1api.DataUpload {
|
||||
du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result()
|
||||
du.Status.Node = "test-node"
|
||||
return du
|
||||
}(),
|
||||
expectedProcessed: true,
|
||||
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(),
|
||||
expectedRequeue: ctrl.Result{},
|
||||
notCreateFSBR: true,
|
||||
},
|
||||
{
|
||||
name: "Dataupload should not be cancel with dismatch node",
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
|
||||
du: func() *velerov2alpha1api.DataUpload {
|
||||
du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result()
|
||||
du.Status.Node = "different_node"
|
||||
return du
|
||||
}(),
|
||||
expectedProcessed: false,
|
||||
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(),
|
||||
expectedRequeue: ctrl.Result{},
|
||||
notCreateFSBR: true,
|
||||
},
|
||||
{
|
||||
name: "runCancelableDataUpload is concurrent limited",
|
||||
dataMgr: datapath.NewManager(0),
|
||||
|
@ -511,16 +538,17 @@ func TestReconcile(t *testing.T) {
|
|||
} else if test.du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
|
||||
r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())}
|
||||
}
|
||||
|
||||
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||
return &fakeDataUploadFSBR{
|
||||
du: test.du,
|
||||
kubeClient: r.client,
|
||||
clock: r.Clock,
|
||||
if !test.notCreateFSBR {
|
||||
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||
return &fakeDataUploadFSBR{
|
||||
du: test.du,
|
||||
kubeClient: r.client,
|
||||
clock: r.Clock,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
|
||||
if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress && !test.notCreateFSBR {
|
||||
if fsBR := r.dataPathMgr.GetAsyncBR(test.du.Name); fsBR == nil {
|
||||
_, err := r.dataPathMgr.CreateFileSystemBR(test.du.Name, pVBRRequestor, ctx, r.client, velerov1api.DefaultNamespace, datapath.Callbacks{OnCancelled: r.OnDataUploadCancelled}, velerotest.NewLogger())
|
||||
require.NoError(t, err)
|
||||
|
|
Loading…
Reference in New Issue