mirror of https://github.com/milvus-io/milvus.git
Fix bug for IndexCoord recycles index files (#18168)
Signed-off-by: Cai.Zhang <cai.zhang@zilliz.com>pull/18183/head
parent
0fb5e9c1a9
commit
90faf4a212
|
@ -107,7 +107,6 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() {
|
||||||
case <-gc.ctx.Done():
|
case <-gc.ctx.Done():
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
indexID2Files := gc.metaTable.GetBuildID2IndexFiles()
|
|
||||||
prefix := Params.IndexNodeCfg.IndexStorageRootPath + "/"
|
prefix := Params.IndexNodeCfg.IndexStorageRootPath + "/"
|
||||||
// list dir first
|
// list dir first
|
||||||
keys, err := gc.chunkManager.ListWithPrefix(prefix, false)
|
keys, err := gc.chunkManager.ListWithPrefix(prefix, false)
|
||||||
|
@ -121,8 +120,8 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() {
|
||||||
log.Error("IndexCoord garbageCollector recycleUnusedIndexFiles parseIndexFileKey", zap.String("key", key), zap.Error(err))
|
log.Error("IndexCoord garbageCollector recycleUnusedIndexFiles parseIndexFileKey", zap.String("key", key), zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
indexFiles, ok := indexID2Files[buildID]
|
log.Info("IndexCoord garbageCollector will recycle index files", zap.Int64("buildID", buildID))
|
||||||
if !ok {
|
if !gc.metaTable.HasBuildID(buildID) {
|
||||||
// buildID no longer exists in meta, remove all index files
|
// buildID no longer exists in meta, remove all index files
|
||||||
log.Info("IndexCoord garbageCollector recycleUnusedIndexFiles find meta has not exist, remove index files",
|
log.Info("IndexCoord garbageCollector recycleUnusedIndexFiles find meta has not exist, remove index files",
|
||||||
zap.Int64("buildID", buildID))
|
zap.Int64("buildID", buildID))
|
||||||
|
@ -134,13 +133,17 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() {
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Prevent IndexNode from being recycled as soon as the index file is written
|
log.Info("index meta can be recycled, recycle index files", zap.Int64("buildID", buildID))
|
||||||
if !gc.metaTable.CanBeRecycledIndexFiles(buildID) {
|
indexInfo, err := gc.metaTable.GetIndexFilePathInfo(buildID)
|
||||||
|
if err != nil {
|
||||||
|
// Even if the index is marked as deleted, the index file will not be recycled, wait for the next gc,
|
||||||
|
// and delete all index files about the buildID at one time.
|
||||||
|
log.Warn("IndexCoord garbageCollector get index files fail", zap.Int64("buildID", buildID),
|
||||||
|
zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// buildID still exists in meta, remove unnecessary index files
|
|
||||||
filesMap := make(map[string]bool)
|
filesMap := make(map[string]bool)
|
||||||
for _, file := range indexFiles {
|
for _, file := range indexInfo.IndexFilePaths {
|
||||||
filesMap[file] = true
|
filesMap[file] = true
|
||||||
}
|
}
|
||||||
files, err := gc.chunkManager.ListWithPrefix(key, true)
|
files, err := gc.chunkManager.ListWithPrefix(key, true)
|
||||||
|
@ -149,6 +152,9 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() {
|
||||||
zap.Int64("buildID", buildID), zap.String("prefix", key), zap.Error(err))
|
zap.Int64("buildID", buildID), zap.String("prefix", key), zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
log.Info("recycle index files", zap.Int64("buildID", buildID), zap.Int("meta files num", len(filesMap)),
|
||||||
|
zap.Int("chunkManager files num", len(files)))
|
||||||
|
deletedFilesNum := 0
|
||||||
for _, file := range files {
|
for _, file := range files {
|
||||||
if _, ok := filesMap[file]; !ok {
|
if _, ok := filesMap[file]; !ok {
|
||||||
if err = gc.chunkManager.Remove(file); err != nil {
|
if err = gc.chunkManager.Remove(file); err != nil {
|
||||||
|
@ -156,8 +162,11 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() {
|
||||||
zap.Int64("buildID", buildID), zap.String("file", file), zap.Error(err))
|
zap.Int64("buildID", buildID), zap.String("file", file), zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
deletedFilesNum++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
log.Info("index files recycle success", zap.Int64("buildID", buildID),
|
||||||
|
zap.Int("delete index files num", deletedFilesNum))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -288,6 +288,54 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
|
||||||
cancel()
|
cancel()
|
||||||
gc.wg.Wait()
|
gc.wg.Wait()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("meta mark deleted", func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
gc := &garbageCollector{
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
wg: sync.WaitGroup{},
|
||||||
|
gcFileDuration: time.Millisecond * 300,
|
||||||
|
gcMetaDuration: time.Millisecond * 300,
|
||||||
|
metaTable: &metaTable{
|
||||||
|
indexBuildID2Meta: map[UniqueID]*Meta{
|
||||||
|
1: {
|
||||||
|
indexMeta: &indexpb.IndexMeta{
|
||||||
|
IndexBuildID: 1,
|
||||||
|
IndexFilePaths: []string{"file1", "file2", "file3"},
|
||||||
|
State: commonpb.IndexState_Finished,
|
||||||
|
MarkDeleted: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
client: &mockETCDKV{
|
||||||
|
remove: func(s string) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
chunkManager: &chunkManagerMock{
|
||||||
|
removeWithPrefix: func(s string) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
listWithPrefix: func(s string, recursive bool) ([]string, error) {
|
||||||
|
if !recursive {
|
||||||
|
return []string{"a/b/1/"}, nil
|
||||||
|
}
|
||||||
|
return []string{"a/b/1/c"}, nil
|
||||||
|
},
|
||||||
|
remove: func(s string) error {
|
||||||
|
return fmt.Errorf("error")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
gc.wg.Add(1)
|
||||||
|
go gc.recycleUnusedIndexFiles()
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
cancel()
|
||||||
|
gc.wg.Wait()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIndexCoord_recycleUnusedMetaLoop(t *testing.T) {
|
func TestIndexCoord_recycleUnusedMetaLoop(t *testing.T) {
|
||||||
|
|
|
@ -476,8 +476,8 @@ func TestIndexCoord_GetIndexFilePaths(t *testing.T) {
|
||||||
t.Run("GetIndexFilePaths failed", func(t *testing.T) {
|
t.Run("GetIndexFilePaths failed", func(t *testing.T) {
|
||||||
resp, err := ic.GetIndexFilePaths(context.Background(), &indexpb.GetIndexFilePathsRequest{IndexBuildIDs: []UniqueID{2}})
|
resp, err := ic.GetIndexFilePaths(context.Background(), &indexpb.GetIndexFilePathsRequest{IndexBuildIDs: []UniqueID{2}})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||||
assert.NotEqual(t, "", resp.Status.Reason)
|
assert.Equal(t, 0, len(resp.FilePaths[0].IndexFilePaths))
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("set DataCoord with nil", func(t *testing.T) {
|
t.Run("set DataCoord with nil", func(t *testing.T) {
|
||||||
|
|
|
@ -461,7 +461,7 @@ func (mt *metaTable) GetIndexFilePathInfo(indexBuildID UniqueID) (*indexpb.Index
|
||||||
if meta.indexMeta.MarkDeleted {
|
if meta.indexMeta.MarkDeleted {
|
||||||
return nil, fmt.Errorf("index not exists with ID = %d", indexBuildID)
|
return nil, fmt.Errorf("index not exists with ID = %d", indexBuildID)
|
||||||
}
|
}
|
||||||
if meta.indexMeta.State != commonpb.IndexState_Finished {
|
if meta.indexMeta.State != commonpb.IndexState_Finished && meta.indexMeta.State != commonpb.IndexState_Failed {
|
||||||
return nil, fmt.Errorf("index not finished with ID = %d", indexBuildID)
|
return nil, fmt.Errorf("index not finished with ID = %d", indexBuildID)
|
||||||
}
|
}
|
||||||
ret.IndexFilePaths = meta.indexMeta.IndexFilePaths
|
ret.IndexFilePaths = meta.indexMeta.IndexFilePaths
|
||||||
|
@ -605,25 +605,10 @@ func (mt *metaTable) NeedUpdateMeta(m *Meta) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mt *metaTable) CanBeRecycledIndexFiles(buildID UniqueID) bool {
|
func (mt *metaTable) HasBuildID(buildID UniqueID) bool {
|
||||||
mt.lock.RLock()
|
mt.lock.RLock()
|
||||||
defer mt.lock.RUnlock()
|
defer mt.lock.RUnlock()
|
||||||
|
|
||||||
meta, ok := mt.indexBuildID2Meta[buildID]
|
_, ok := mt.indexBuildID2Meta[buildID]
|
||||||
if !ok {
|
return ok
|
||||||
log.Debug("index meta not exist, can be recycled", zap.Int64("buildID", buildID))
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if meta.indexMeta.MarkDeleted {
|
|
||||||
log.Debug("index has been deleted, can be recycled", zap.Int64("buildID", buildID))
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if meta.indexMeta.State == commonpb.IndexState_Finished || meta.indexMeta.State == commonpb.IndexState_Failed {
|
|
||||||
log.Debug("index has been finished, can be recycled", zap.Int64("buildID", buildID),
|
|
||||||
zap.String("state", meta.indexMeta.State.String()))
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
log.Debug("index meta can not be recycled", zap.Int64("buildID", buildID),
|
|
||||||
zap.Bool("deleted", meta.indexMeta.MarkDeleted), zap.String("state", meta.indexMeta.String()))
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -216,7 +216,7 @@ func (it *IndexBuildTask) saveIndexMeta(ctx context.Context) error {
|
||||||
fn := func() error {
|
fn := func() error {
|
||||||
indexMeta, version, err := it.loadIndexMeta(ctx)
|
indexMeta, version, err := it.loadIndexMeta(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("IndexNode IndexBuildTask saveIndexMeta fail to load index meta,", zap.Int64("build Id", indexMeta.IndexBuildID), zap.Error(err))
|
log.Error("IndexNode IndexBuildTask saveIndexMeta fail to load index meta,", zap.Int64("build Id", it.req.IndexBuildID), zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
taskState := it.updateTaskState(indexMeta, it.internalErr)
|
taskState := it.updateTaskState(indexMeta, it.internalErr)
|
||||||
|
|
Loading…
Reference in New Issue