update isDelete of index task meta when delete task from index builder (#25717)

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/25825/head
aoiasd 2023-07-21 17:15:00 +08:00 committed by GitHub
parent 32827f538a
commit 4cb6351f87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 81 additions and 21 deletions

View File

@ -45,10 +45,6 @@ const (
indexTaskDone
// index task need to retry.
indexTaskRetry
// task has been deleted.
indexTaskDeleted
// task needs to prepare segment info on IndexNode
indexTaskPrepare
reqTimeoutInterval = time.Second * 10
)
@ -58,8 +54,6 @@ var TaskStateNames = map[indexTaskState]string{
1: "InProgress",
2: "Done",
3: "Retry",
4: "Deleted",
5: "Prepare",
}
func (x indexTaskState) String() string {
@ -121,6 +115,9 @@ func (ib *indexBuilder) reloadFromKV() {
segments := ib.meta.GetAllSegmentsUnsafe()
for _, segment := range segments {
for _, segIndex := range segment.segmentIndexes {
if segIndex.IsDeleted {
continue
}
if segIndex.IndexState == commonpb.IndexState_Unissued {
ib.tasks[segIndex.BuildID] = indexTaskInit
} else if segIndex.IndexState == commonpb.IndexState_InProgress {
@ -223,6 +220,10 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
segment := ib.meta.GetSegment(meta.SegmentID)
if !isSegmentHealthy(segment) || !ib.meta.IsIndexExist(meta.CollectionID, meta.IndexID) {
log.Ctx(ib.ctx).Info("task is no need to build index, remove it", zap.Int64("buildID", buildID))
if err := ib.meta.DeleteTask(buildID); err != nil {
log.Ctx(ib.ctx).Warn("IndexCoord delete index failed", zap.Int64("buildID", buildID), zap.Error(err))
return false
}
deleteFunc(buildID)
return true
}
@ -329,9 +330,6 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
}
updateStateFunc(buildID, indexTaskInit)
case indexTaskDeleted:
deleteFunc(buildID)
default:
// state: in_progress
updateStateFunc(buildID, ib.getTaskState(buildID, meta.NodeID))

View File

@ -627,10 +627,10 @@ func TestIndexBuilder(t *testing.T) {
ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager)
assert.Equal(t, 7, len(ib.tasks))
assert.Equal(t, 6, len(ib.tasks))
assert.Equal(t, indexTaskInit, ib.tasks[buildID])
assert.Equal(t, indexTaskInProgress, ib.tasks[buildID+1])
assert.Equal(t, indexTaskInProgress, ib.tasks[buildID+2])
// buildID+2 will be filter by isDeleted
assert.Equal(t, indexTaskInit, ib.tasks[buildID+3])
assert.Equal(t, indexTaskInProgress, ib.tasks[buildID+8])
assert.Equal(t, indexTaskInit, ib.tasks[buildID+9])
@ -708,16 +708,6 @@ func TestIndexBuilder_Error(t *testing.T) {
assert.False(t, ok)
})
t.Run("init no need to build index", func(t *testing.T) {
ib.tasks[buildID] = indexTaskInit
ib.meta.indexes[collID][indexID].IsDeleted = true
ib.process(buildID)
_, ok := ib.tasks[buildID]
assert.False(t, ok)
ib.meta.indexes[collID][indexID].IsDeleted = false
})
t.Run("finish few rows task fail", func(t *testing.T) {
ib.tasks[buildID+9] = indexTaskInit
ib.process(buildID + 9)
@ -749,6 +739,28 @@ func TestIndexBuilder_Error(t *testing.T) {
assert.Equal(t, indexTaskInit, state)
})
t.Run("no need to build index but update catalog failed", func(t *testing.T) {
ib.meta.catalog = ec
ib.meta.indexes[collID][indexID].IsDeleted = true
ib.tasks[buildID] = indexTaskInit
ok := ib.process(buildID)
assert.False(t, ok)
_, ok = ib.tasks[buildID]
assert.True(t, ok)
})
t.Run("init no need to build index", func(t *testing.T) {
ib.meta.catalog = sc
ib.meta.indexes[collID][indexID].IsDeleted = true
ib.tasks[buildID] = indexTaskInit
ib.process(buildID)
_, ok := ib.tasks[buildID]
assert.False(t, ok)
ib.meta.indexes[collID][indexID].IsDeleted = false
})
t.Run("assign task error", func(t *testing.T) {
paramtable.Get().Save(Params.CommonCfg.StorageType.Key, "local")
ib.tasks[buildID] = indexTaskInit

View File

@ -545,6 +545,30 @@ func (m *meta) FinishTask(taskInfo *indexpb.IndexTaskInfo) error {
return nil
}
func (m *meta) DeleteTask(buildID int64) error {
m.Lock()
defer m.Unlock()
segIdx, ok := m.buildID2SegmentIndex[buildID]
if !ok {
log.Warn("there is no index with buildID", zap.Int64("buildID", buildID))
return nil
}
updateFunc := func(segIdx *model.SegmentIndex) error {
segIdx.IsDeleted = true
return m.alterSegmentIndexes([]*model.SegmentIndex{segIdx})
}
if err := m.updateSegIndexMeta(segIdx, updateFunc); err != nil {
return err
}
log.Info("delete index task success", zap.Int64("buildID", buildID))
m.updateIndexTasksMetrics()
return nil
}
// BuildIndex set the index state to be InProgress. It means IndexNode is building the index.
func (m *meta) BuildIndex(buildID UniqueID) error {
m.Lock()

View File

@ -1263,3 +1263,29 @@ func TestUpdateSegmentIndexNotExists(t *testing.T) {
})
})
}
func TestMeta_DeleteTask_Error(t *testing.T) {
m := &meta{buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex)}
t.Run("segment index not found", func(t *testing.T) {
err := m.DeleteTask(buildID)
assert.NoError(t, err)
})
t.Run("segment update failed", func(t *testing.T) {
ec := catalogmocks.NewDataCoordCatalog(t)
ec.On("AlterSegmentIndexes",
mock.Anything,
mock.Anything,
).Return(errors.New("fail"))
m.catalog = ec
m.buildID2SegmentIndex[buildID] = &model.SegmentIndex{
SegmentID: segID,
PartitionID: partID,
CollectionID: collID,
}
err := m.DeleteTask(buildID)
assert.Error(t, err)
})
}