mirror of https://github.com/milvus-io/milvus.git
Set GetIndexState is compatible with SDK version 2.1 (#21608)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>pull/21592/head
@ -512,6 +512,15 @@ func (i *IndexCoord) GetIndexState(ctx context.Context, req *indexpb.GetIndexSta
}, nil
if req.GetIndexName() == "" && len(indexID2CreateTs) > 1 {
return &indexpb.GetIndexStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: ErrMsgAmbiguousIndexName,
}, nil
ret := &indexpb.GetIndexStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -637,6 +646,28 @@ func (i *IndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.Get
}, nil
indexID2CreateTs := i.metaTable.GetIndexIDByName(req.CollectionID, req.IndexName)
if len(indexID2CreateTs) == 0 {
errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName)
log.Error("IndexCoord get index state fail", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg))
return &indexpb.GetIndexBuildProgressResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: errMsg,
}, nil
if req.GetIndexName() == "" && len(indexID2CreateTs) > 1 {
return &indexpb.GetIndexBuildProgressResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: ErrMsgAmbiguousIndexName,
}, nil
flushSegments, err := i.dataCoordClient.GetFlushedSegments(ctx, &datapb.GetFlushedSegmentsRequest{
CollectionID: req.CollectionID,
PartitionID: -1,
@ -667,25 +698,12 @@ func (i *IndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.Get
totalRows += seg.NumOfRows
indexID2CreateTs := i.metaTable.GetIndexIDByName(req.CollectionID, req.IndexName)
if len(indexID2CreateTs) == 0 {
errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName)
log.Error("IndexCoord get index state fail", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg))
return &indexpb.GetIndexBuildProgressResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: errMsg,
}, nil
for indexID := range indexID2CreateTs {
indexRows = i.metaTable.GetIndexBuildProgress(indexID, flushSegments.Segments)
log.RatedInfo(5, "IndexCoord get index build progress success", zap.Int64("collID", req.CollectionID),
log.Info("IndexCoord get index build progress success", zap.Int64("collID", req.CollectionID),
zap.Int64("totalRows", totalRows), zap.Int64("indexRows", indexRows),
zap.Int("seg num", len(flushSegments.Segments)))
return &indexpb.GetIndexBuildProgressResponse{
@ -910,6 +928,9 @@ func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeInd
}, nil
log.Info("IndexCoord describe index success", zap.Int64("collectionID", req.CollectionID),
zap.Int64("indexID", indexInfo.IndexID), zap.Int64("total rows", indexInfo.TotalRows),
zap.Int64("index rows", indexInfo.IndexedRows), zap.String("index state", indexInfo.State.String()))
indexInfos = append(indexInfos, indexInfo)
@ -867,293 +867,6 @@ func TestIndexCoord_DropIndex(t *testing.T) {
// TODO @xiaocai2333: add ut for error occurred.
//func TestIndexCoord_watchNodeLoop(t *testing.T) {
// ech := make(chan *sessionutil.SessionEvent)
// in := &IndexCoord{
// loopWg: sync.WaitGroup{},
// loopCtx: context.Background(),
// eventChan: ech,
// session: &sessionutil.Session{
// TriggerKill: true,
// ServerID: 0,
// },
// }
// in.loopWg.Add(1)
// flag := false
// closed := false
// sigDone := make(chan struct{}, 1)
// sigQuit := make(chan struct{}, 1)
// sc := make(chan os.Signal, 1)
// signal.Notify(sc, syscall.SIGINT)
// defer signal.Reset(syscall.SIGINT)
// go func() {
// in.watchNodeLoop()
// flag = true
// sigDone <- struct{}{}
// }()
// go func() {
// <-sc
// closed = true
// sigQuit <- struct{}{}
// }()
// close(ech)
// <-sigDone
// <-sigQuit
// assert.True(t, flag)
// assert.True(t, closed)
//func TestIndexCoord_watchMetaLoop(t *testing.T) {
// ctx, cancel := context.WithCancel(context.Background())
// ic := &IndexCoord{
// loopCtx: ctx,
// loopWg: sync.WaitGroup{},
// }
// watchChan := make(chan clientv3.WatchResponse, 1024)
// client := &mockETCDKV{
// watchWithRevision: func(s string, i int64) clientv3.WatchChan {
// return watchChan
// },
// }
// mt := &metaTable{
// client: client,
// indexBuildID2Meta: map[UniqueID]*Meta{},
// etcdRevision: 0,
// lock: sync.RWMutex{},
// }
// ic.metaTable = mt
// t.Run("watch chan panic", func(t *testing.T) {
// ic.loopWg.Add(1)
// watchChan <- clientv3.WatchResponse{Canceled: true}
// assert.Panics(t, func() {
// ic.watchMetaLoop()
// })
// ic.loopWg.Wait()
// })
// t.Run("watch chan new meta table panic", func(t *testing.T) {
// client = &mockETCDKV{
// watchWithRevision: func(s string, i int64) clientv3.WatchChan {
// return watchChan
// },
// loadWithRevisionAndVersions: func(s string) ([]string, []string, []int64, int64, error) {
// return []string{}, []string{}, []int64{}, 0, fmt.Errorf("error occurred")
// },
// }
// mt = &metaTable{
// client: client,
// indexBuildID2Meta: map[UniqueID]*Meta{},
// etcdRevision: 0,
// lock: sync.RWMutex{},
// }
// ic.metaTable = mt
// ic.loopWg.Add(1)
// watchChan <- clientv3.WatchResponse{CompactRevision: 10}
// assert.Panics(t, func() {
// ic.watchMetaLoop()
// })
// ic.loopWg.Wait()
// })
// t.Run("watch chan new meta success", func(t *testing.T) {
// ic.loopWg = sync.WaitGroup{}
// client = &mockETCDKV{
// watchWithRevision: func(s string, i int64) clientv3.WatchChan {
// return watchChan
// },
// loadWithRevisionAndVersions: func(s string) ([]string, []string, []int64, int64, error) {
// return []string{}, []string{}, []int64{}, 0, nil
// },
// }
// mt = &metaTable{
// client: client,
// indexBuildID2Meta: map[UniqueID]*Meta{},
// etcdRevision: 0,
// lock: sync.RWMutex{},
// }
// ic.metaTable = mt
// ic.loopWg.Add(1)
// watchChan <- clientv3.WatchResponse{CompactRevision: 10}
// go ic.watchMetaLoop()
// cancel()
// ic.loopWg.Wait()
// })
//func TestIndexCoord_GetComponentStates(t *testing.T) {
// n := &IndexCoord{}
// n.stateCode.Store(commonpb.StateCode_Healthy)
// resp, err := n.GetComponentStates(context.Background())
// assert.NoError(t, err)
// assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// assert.Equal(t, common.NotRegisteredID, resp.State.NodeID)
// n.session = &sessionutil.Session{}
// n.session.UpdateRegistered(true)
// resp, err = n.GetComponentStates(context.Background())
// assert.NoError(t, err)
// assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
//func TestIndexCoord_NotHealthy(t *testing.T) {
// ic := &IndexCoord{}
// ic.stateCode.Store(commonpb.StateCode_Abnormal)
// req := &indexpb.BuildIndexRequest{}
// resp, err := ic.BuildIndex(context.Background(), req)
// assert.Error(t, err)
// assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
// req2 := &indexpb.DropIndexRequest{}
// status, err := ic.DropIndex(context.Background(), req2)
// assert.Nil(t, err)
// assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
// req3 := &indexpb.GetIndexStatesRequest{}
// resp2, err := ic.GetIndexStates(context.Background(), req3)
// assert.Nil(t, err)
// assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp2.Status.ErrorCode)
// req4 := &indexpb.GetIndexFilePathsRequest{
// IndexBuildIDs: []UniqueID{1, 2},
// }
// resp4, err := ic.GetIndexFilePaths(context.Background(), req4)
// assert.Nil(t, err)
// assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp4.Status.ErrorCode)
// req5 := &indexpb.RemoveIndexRequest{}
// resp5, err := ic.RemoveIndex(context.Background(), req5)
// assert.Nil(t, err)
// assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp5.GetErrorCode())
//func TestIndexCoord_GetIndexFilePaths(t *testing.T) {
// ic := &IndexCoord{
// metaTable: &metaTable{
// indexBuildID2Meta: map[UniqueID]*Meta{
// 1: {
// indexMeta: &indexpb.IndexMeta{
// IndexBuildID: 1,
// State: commonpb.IndexState_Finished,
// IndexFileKeys: []string{"indexFiles-1", "indexFiles-2"},
// },
// },
// 2: {
// indexMeta: &indexpb.IndexMeta{
// IndexBuildID: 2,
// State: commonpb.IndexState_Failed,
// },
// },
// },
// },
// }
// ic.stateCode.Store(commonpb.StateCode_Healthy)
// t.Run("GetIndexFilePaths success", func(t *testing.T) {
// resp, err := ic.GetIndexFilePaths(context.Background(), &indexpb.GetIndexFilePathsRequest{IndexBuildIDs: []UniqueID{1}})
// assert.NoError(t, err)
// assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// assert.Equal(t, 1, len(resp.FilePaths))
// assert.ElementsMatch(t, resp.FilePaths[0].IndexFileKeys, []string{"indexFiles-1", "indexFiles-2"})
// })
// t.Run("GetIndexFilePaths failed", func(t *testing.T) {
// resp, err := ic.GetIndexFilePaths(context.Background(), &indexpb.GetIndexFilePathsRequest{IndexBuildIDs: []UniqueID{2}})
// assert.NoError(t, err)
// assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// assert.Equal(t, 0, len(resp.FilePaths[0].IndexFileKeys))
// })
// t.Run("set DataCoord with nil", func(t *testing.T) {
// err := ic.SetDataCoord(nil)
// assert.Error(t, err)
// })
//func Test_tryAcquireSegmentReferLock(t *testing.T) {
// ic := &IndexCoord{
// session: &sessionutil.Session{
// ServerID: 1,
// },
// }
// dcm := &DataCoordMock{
// Err: false,
// Fail: false,
// }
// cmm := &ChunkManagerMock{
// Err: false,
// Fail: false,
// }
// ic.dataCoordClient = dcm
// ic.chunkManager = cmm
// t.Run("success", func(t *testing.T) {
// err := ic.tryAcquireSegmentReferLock(context.Background(), 1, 1, []UniqueID{1})
// assert.Nil(t, err)
// })
// t.Run("error", func(t *testing.T) {
// dcmE := &DataCoordMock{
// Err: true,
// Fail: false,
// }
// ic.dataCoordClient = dcmE
// err := ic.tryAcquireSegmentReferLock(context.Background(), 1, 1, []UniqueID{1})
// assert.Error(t, err)
// })
// t.Run("Fail", func(t *testing.T) {
// dcmF := &DataCoordMock{
// Err: false,
// Fail: true,
// }
// ic.dataCoordClient = dcmF
// err := ic.tryAcquireSegmentReferLock(context.Background(), 1, 1, []UniqueID{1})
// assert.Error(t, err)
// })
//func Test_tryReleaseSegmentReferLock(t *testing.T) {
// ic := &IndexCoord{
// session: &sessionutil.Session{
// ServerID: 1,
// },
// }
// dcm := &DataCoordMock{
// Err: false,
// Fail: false,
// }
// ic.dataCoordClient = dcm
// t.Run("success", func(t *testing.T) {
// err := ic.tryReleaseSegmentReferLock(context.Background(), 1, 1)
// assert.NoError(t, err)
// })
//func TestIndexCoord_RemoveIndex(t *testing.T) {
// ic := &IndexCoord{
// metaTable: &metaTable{},
// indexBuilder: &indexBuilder{
// notify: make(chan struct{}, 10),
// },
// }
// ic.stateCode.Store(commonpb.StateCode_Healthy)
// status, err := ic.RemoveIndex(context.Background(), &indexpb.RemoveIndexRequest{BuildIDs: []UniqueID{0}})
// assert.Nil(t, err)
// assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode())
func TestIndexCoord_pullSegmentInfo(t *testing.T) {
t.Run("success", func(t *testing.T) {
ic := &IndexCoord{
@ -1410,3 +1123,117 @@ func TestIndexCoord_CreateIndex(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode())
func TestIndexCoord_GetIndexState(t *testing.T) {
ic := &IndexCoord{
metaTable: &metaTable{
catalog: &indexcoord.Catalog{Txn: NewMockEtcdKV()},
collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{},
req := &indexpb.GetIndexStateRequest{
CollectionID: collID,
IndexName: "",
ctx := context.Background()
t.Run("no index", func(t *testing.T) {
resp, err := ic.GetIndexState(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
err := ic.metaTable.CreateIndex(&model.Index{
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: "_default_idx_101",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
assert.NoError(t, err)
err = ic.metaTable.CreateIndex(&model.Index{
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 1,
IndexID: indexID + 1,
IndexName: "_default_idx_102",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
assert.NoError(t, err)
t.Run("multiple indexes", func(t *testing.T) {
resp, err := ic.GetIndexState(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
func TestIndexCoord_GetIndexBuildProgress(t *testing.T) {
ic := &IndexCoord{
metaTable: &metaTable{
catalog: &indexcoord.Catalog{Txn: NewMockEtcdKV()},
collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{},
req := &indexpb.GetIndexBuildProgressRequest{
CollectionID: collID,
IndexName: "",
ctx := context.Background()
t.Run("no index", func(t *testing.T) {
resp, err := ic.GetIndexBuildProgress(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
err := ic.metaTable.CreateIndex(&model.Index{
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: "_default_idx_101",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
assert.NoError(t, err)
err = ic.metaTable.CreateIndex(&model.Index{
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 1,
IndexID: indexID + 1,
IndexName: "_default_idx_102",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
assert.NoError(t, err)
t.Run("multiple indexes", func(t *testing.T) {
resp, err := ic.GetIndexBuildProgress(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
@ -618,10 +618,6 @@ func (gibpt *getIndexBuildProgressTask) Execute(ctx context.Context) error {
gibpt.collectionID = collectionID
if gibpt.IndexName == "" {
gibpt.IndexName = Params.CommonCfg.DefaultIndexName
resp, err := gibpt.indexCoord.GetIndexBuildProgress(ctx, &indexpb.GetIndexBuildProgressRequest{
CollectionID: collectionID,
IndexName: gibpt.IndexName,
@ -704,10 +700,6 @@ func (gist *getIndexStateTask) PreExecute(ctx context.Context) error {
func (gist *getIndexStateTask) Execute(ctx context.Context) error {
if gist.IndexName == "" {
gist.IndexName = Params.CommonCfg.DefaultIndexName
collectionID, err := globalMetaCache.GetCollectionID(ctx, gist.CollectionName)
if err != nil {
return err
Reference in New Issue