From 4cb6351f871b5f8c278956b4bf03e8cef10f7460 Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Fri, 21 Jul 2023 17:15:00 +0800 Subject: [PATCH] update isDelete of index task meta when delete task from index builder (#25717) Signed-off-by: aoiasd --- internal/datacoord/index_builder.go | 16 +++++------ internal/datacoord/index_builder_test.go | 36 ++++++++++++++++-------- internal/datacoord/index_meta.go | 24 ++++++++++++++++ internal/datacoord/index_meta_test.go | 26 +++++++++++++++++ 4 files changed, 81 insertions(+), 21 deletions(-) diff --git a/internal/datacoord/index_builder.go b/internal/datacoord/index_builder.go index 9244cf440f..526f44ca9e 100644 --- a/internal/datacoord/index_builder.go +++ b/internal/datacoord/index_builder.go @@ -45,10 +45,6 @@ const ( indexTaskDone // index task need to retry. indexTaskRetry - // task has been deleted. - indexTaskDeleted - // task needs to prepare segment info on IndexNode - indexTaskPrepare reqTimeoutInterval = time.Second * 10 ) @@ -58,8 +54,6 @@ var TaskStateNames = map[indexTaskState]string{ 1: "InProgress", 2: "Done", 3: "Retry", - 4: "Deleted", - 5: "Prepare", } func (x indexTaskState) String() string { @@ -121,6 +115,9 @@ func (ib *indexBuilder) reloadFromKV() { segments := ib.meta.GetAllSegmentsUnsafe() for _, segment := range segments { for _, segIndex := range segment.segmentIndexes { + if segIndex.IsDeleted { + continue + } if segIndex.IndexState == commonpb.IndexState_Unissued { ib.tasks[segIndex.BuildID] = indexTaskInit } else if segIndex.IndexState == commonpb.IndexState_InProgress { @@ -223,6 +220,10 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { segment := ib.meta.GetSegment(meta.SegmentID) if !isSegmentHealthy(segment) || !ib.meta.IsIndexExist(meta.CollectionID, meta.IndexID) { log.Ctx(ib.ctx).Info("task is no need to build index, remove it", zap.Int64("buildID", buildID)) + if err := ib.meta.DeleteTask(buildID); err != nil { + log.Ctx(ib.ctx).Warn("IndexCoord delete index failed", zap.Int64("buildID", buildID), zap.Error(err)) + return false + } deleteFunc(buildID) return true } @@ -329,9 +330,6 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { } updateStateFunc(buildID, indexTaskInit) - case indexTaskDeleted: - deleteFunc(buildID) - default: // state: in_progress updateStateFunc(buildID, ib.getTaskState(buildID, meta.NodeID)) diff --git a/internal/datacoord/index_builder_test.go b/internal/datacoord/index_builder_test.go index 37dfeb81bb..68871bb934 100644 --- a/internal/datacoord/index_builder_test.go +++ b/internal/datacoord/index_builder_test.go @@ -627,10 +627,10 @@ func TestIndexBuilder(t *testing.T) { ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager) - assert.Equal(t, 7, len(ib.tasks)) + assert.Equal(t, 6, len(ib.tasks)) assert.Equal(t, indexTaskInit, ib.tasks[buildID]) assert.Equal(t, indexTaskInProgress, ib.tasks[buildID+1]) - assert.Equal(t, indexTaskInProgress, ib.tasks[buildID+2]) + // buildID+2 will be filter by isDeleted assert.Equal(t, indexTaskInit, ib.tasks[buildID+3]) assert.Equal(t, indexTaskInProgress, ib.tasks[buildID+8]) assert.Equal(t, indexTaskInit, ib.tasks[buildID+9]) @@ -708,16 +708,6 @@ func TestIndexBuilder_Error(t *testing.T) { assert.False(t, ok) }) - t.Run("init no need to build index", func(t *testing.T) { - ib.tasks[buildID] = indexTaskInit - ib.meta.indexes[collID][indexID].IsDeleted = true - ib.process(buildID) - - _, ok := ib.tasks[buildID] - assert.False(t, ok) - ib.meta.indexes[collID][indexID].IsDeleted = false - }) - t.Run("finish few rows task fail", func(t *testing.T) { ib.tasks[buildID+9] = indexTaskInit ib.process(buildID + 9) @@ -749,6 +739,28 @@ func TestIndexBuilder_Error(t *testing.T) { assert.Equal(t, indexTaskInit, state) }) + t.Run("no need to build index but update catalog failed", func(t *testing.T) { + ib.meta.catalog = ec + ib.meta.indexes[collID][indexID].IsDeleted = true + ib.tasks[buildID] = indexTaskInit + ok := ib.process(buildID) + assert.False(t, ok) + + _, ok = ib.tasks[buildID] + assert.True(t, ok) + }) + + t.Run("init no need to build index", func(t *testing.T) { + ib.meta.catalog = sc + ib.meta.indexes[collID][indexID].IsDeleted = true + ib.tasks[buildID] = indexTaskInit + ib.process(buildID) + + _, ok := ib.tasks[buildID] + assert.False(t, ok) + ib.meta.indexes[collID][indexID].IsDeleted = false + }) + t.Run("assign task error", func(t *testing.T) { paramtable.Get().Save(Params.CommonCfg.StorageType.Key, "local") ib.tasks[buildID] = indexTaskInit diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index 66ebd247bd..b8cf2cd3f7 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -545,6 +545,30 @@ func (m *meta) FinishTask(taskInfo *indexpb.IndexTaskInfo) error { return nil } +func (m *meta) DeleteTask(buildID int64) error { + m.Lock() + defer m.Unlock() + + segIdx, ok := m.buildID2SegmentIndex[buildID] + if !ok { + log.Warn("there is no index with buildID", zap.Int64("buildID", buildID)) + return nil + } + + updateFunc := func(segIdx *model.SegmentIndex) error { + segIdx.IsDeleted = true + return m.alterSegmentIndexes([]*model.SegmentIndex{segIdx}) + } + + if err := m.updateSegIndexMeta(segIdx, updateFunc); err != nil { + return err + } + + log.Info("delete index task success", zap.Int64("buildID", buildID)) + m.updateIndexTasksMetrics() + return nil +} + // BuildIndex set the index state to be InProgress. It means IndexNode is building the index. func (m *meta) BuildIndex(buildID UniqueID) error { m.Lock() diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index 37c6627c25..bdd75e2966 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -1263,3 +1263,29 @@ func TestUpdateSegmentIndexNotExists(t *testing.T) { }) }) } + +func TestMeta_DeleteTask_Error(t *testing.T) { + m := &meta{buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex)} + t.Run("segment index not found", func(t *testing.T) { + err := m.DeleteTask(buildID) + assert.NoError(t, err) + }) + + t.Run("segment update failed", func(t *testing.T) { + ec := catalogmocks.NewDataCoordCatalog(t) + ec.On("AlterSegmentIndexes", + mock.Anything, + mock.Anything, + ).Return(errors.New("fail")) + m.catalog = ec + + m.buildID2SegmentIndex[buildID] = &model.SegmentIndex{ + SegmentID: segID, + PartitionID: partID, + CollectionID: collID, + } + + err := m.DeleteTask(buildID) + assert.Error(t, err) + }) +}