From 120c8761223a21e06bf756b44daa8246de634979 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Tue, 10 Jan 2023 16:41:39 +0800 Subject: [PATCH] Set GetIndexState is compatible with SDK version 2.1 (#21608) Signed-off-by: cai.zhang --- internal/indexcoord/index_coord.go | 49 ++- internal/indexcoord/index_coord_test.go | 401 +++++++----------------- internal/proxy/task_index.go | 8 - 3 files changed, 149 insertions(+), 309 deletions(-) diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index a9970ae85c..2e87ddc23f 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -512,6 +512,15 @@ func (i *IndexCoord) GetIndexState(ctx context.Context, req *indexpb.GetIndexSta }, }, nil } + if req.GetIndexName() == "" && len(indexID2CreateTs) > 1 { + log.Warn(ErrMsgAmbiguousIndexName) + return &indexpb.GetIndexStateResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: ErrMsgAmbiguousIndexName, + }, + }, nil + } ret := &indexpb.GetIndexStateResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -637,6 +646,28 @@ func (i *IndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.Get }, nil } + indexID2CreateTs := i.metaTable.GetIndexIDByName(req.CollectionID, req.IndexName) + if len(indexID2CreateTs) == 0 { + errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName) + log.Error("IndexCoord get index state fail", zap.Int64("collectionID", req.CollectionID), + zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg)) + return &indexpb.GetIndexBuildProgressResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: errMsg, + }, + }, nil + } + if req.GetIndexName() == "" && len(indexID2CreateTs) > 1 { + log.Warn(ErrMsgAmbiguousIndexName) + return &indexpb.GetIndexBuildProgressResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: ErrMsgAmbiguousIndexName, + }, + }, nil + } + flushSegments, err := i.dataCoordClient.GetFlushedSegments(ctx, &datapb.GetFlushedSegmentsRequest{ CollectionID: req.CollectionID, PartitionID: -1, @@ -667,25 +698,12 @@ func (i *IndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.Get totalRows += seg.NumOfRows } - indexID2CreateTs := i.metaTable.GetIndexIDByName(req.CollectionID, req.IndexName) - if len(indexID2CreateTs) == 0 { - errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName) - log.Error("IndexCoord get index state fail", zap.Int64("collectionID", req.CollectionID), - zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg)) - return &indexpb.GetIndexBuildProgressResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: errMsg, - }, - }, nil - } - for indexID := range indexID2CreateTs { indexRows = i.metaTable.GetIndexBuildProgress(indexID, flushSegments.Segments) break } - log.RatedInfo(5, "IndexCoord get index build progress success", zap.Int64("collID", req.CollectionID), + log.Info("IndexCoord get index build progress success", zap.Int64("collID", req.CollectionID), zap.Int64("totalRows", totalRows), zap.Int64("indexRows", indexRows), zap.Int("seg num", len(flushSegments.Segments))) return &indexpb.GetIndexBuildProgressResponse{ @@ -910,6 +928,9 @@ func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeInd }, }, nil } + log.Info("IndexCoord describe index success", zap.Int64("collectionID", req.CollectionID), + zap.Int64("indexID", indexInfo.IndexID), zap.Int64("total rows", indexInfo.TotalRows), + zap.Int64("index rows", indexInfo.IndexedRows), zap.String("index state", indexInfo.State.String())) indexInfos = append(indexInfos, indexInfo) } diff --git a/internal/indexcoord/index_coord_test.go b/internal/indexcoord/index_coord_test.go index 44eb8fcf54..b00c9798ae 100644 --- a/internal/indexcoord/index_coord_test.go +++ b/internal/indexcoord/index_coord_test.go @@ -867,293 +867,6 @@ func TestIndexCoord_DropIndex(t *testing.T) { }) } -// TODO @xiaocai2333: add ut for error occurred. - -//func TestIndexCoord_watchNodeLoop(t *testing.T) { -// ech := make(chan *sessionutil.SessionEvent) -// in := &IndexCoord{ -// loopWg: sync.WaitGroup{}, -// loopCtx: context.Background(), -// eventChan: ech, -// session: &sessionutil.Session{ -// TriggerKill: true, -// ServerID: 0, -// }, -// } -// in.loopWg.Add(1) -// -// flag := false -// closed := false -// sigDone := make(chan struct{}, 1) -// sigQuit := make(chan struct{}, 1) -// sc := make(chan os.Signal, 1) -// signal.Notify(sc, syscall.SIGINT) -// defer signal.Reset(syscall.SIGINT) -// -// go func() { -// in.watchNodeLoop() -// flag = true -// sigDone <- struct{}{} -// }() -// go func() { -// <-sc -// closed = true -// sigQuit <- struct{}{} -// }() -// -// close(ech) -// <-sigDone -// <-sigQuit -// assert.True(t, flag) -// assert.True(t, closed) -//} -// -//func TestIndexCoord_watchMetaLoop(t *testing.T) { -// ctx, cancel := context.WithCancel(context.Background()) -// ic := &IndexCoord{ -// loopCtx: ctx, -// loopWg: sync.WaitGroup{}, -// } -// -// watchChan := make(chan clientv3.WatchResponse, 1024) -// -// client := &mockETCDKV{ -// watchWithRevision: func(s string, i int64) clientv3.WatchChan { -// return watchChan -// }, -// } -// mt := &metaTable{ -// client: client, -// indexBuildID2Meta: map[UniqueID]*Meta{}, -// etcdRevision: 0, -// lock: sync.RWMutex{}, -// } -// ic.metaTable = mt -// -// t.Run("watch chan panic", func(t *testing.T) { -// ic.loopWg.Add(1) -// watchChan <- clientv3.WatchResponse{Canceled: true} -// -// assert.Panics(t, func() { -// ic.watchMetaLoop() -// }) -// ic.loopWg.Wait() -// }) -// -// t.Run("watch chan new meta table panic", func(t *testing.T) { -// client = &mockETCDKV{ -// watchWithRevision: func(s string, i int64) clientv3.WatchChan { -// return watchChan -// }, -// loadWithRevisionAndVersions: func(s string) ([]string, []string, []int64, int64, error) { -// return []string{}, []string{}, []int64{}, 0, fmt.Errorf("error occurred") -// }, -// } -// mt = &metaTable{ -// client: client, -// indexBuildID2Meta: map[UniqueID]*Meta{}, -// etcdRevision: 0, -// lock: sync.RWMutex{}, -// } -// ic.metaTable = mt -// ic.loopWg.Add(1) -// watchChan <- clientv3.WatchResponse{CompactRevision: 10} -// assert.Panics(t, func() { -// ic.watchMetaLoop() -// }) -// ic.loopWg.Wait() -// }) -// -// t.Run("watch chan new meta success", func(t *testing.T) { -// ic.loopWg = sync.WaitGroup{} -// client = &mockETCDKV{ -// watchWithRevision: func(s string, i int64) clientv3.WatchChan { -// return watchChan -// }, -// loadWithRevisionAndVersions: func(s string) ([]string, []string, []int64, int64, error) { -// return []string{}, []string{}, []int64{}, 0, nil -// }, -// } -// mt = &metaTable{ -// client: client, -// indexBuildID2Meta: map[UniqueID]*Meta{}, -// etcdRevision: 0, -// lock: sync.RWMutex{}, -// } -// ic.metaTable = mt -// ic.loopWg.Add(1) -// watchChan <- clientv3.WatchResponse{CompactRevision: 10} -// go ic.watchMetaLoop() -// cancel() -// ic.loopWg.Wait() -// }) -//} -// -//func TestIndexCoord_GetComponentStates(t *testing.T) { -// n := &IndexCoord{} -// n.stateCode.Store(commonpb.StateCode_Healthy) -// resp, err := n.GetComponentStates(context.Background()) -// assert.NoError(t, err) -// assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) -// assert.Equal(t, common.NotRegisteredID, resp.State.NodeID) -// n.session = &sessionutil.Session{} -// n.session.UpdateRegistered(true) -// resp, err = n.GetComponentStates(context.Background()) -// assert.NoError(t, err) -// assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) -//} -// -//func TestIndexCoord_NotHealthy(t *testing.T) { -// ic := &IndexCoord{} -// ic.stateCode.Store(commonpb.StateCode_Abnormal) -// req := &indexpb.BuildIndexRequest{} -// resp, err := ic.BuildIndex(context.Background(), req) -// assert.Error(t, err) -// assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) -// -// req2 := &indexpb.DropIndexRequest{} -// status, err := ic.DropIndex(context.Background(), req2) -// assert.Nil(t, err) -// assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) -// -// req3 := &indexpb.GetIndexStatesRequest{} -// resp2, err := ic.GetIndexStates(context.Background(), req3) -// assert.Nil(t, err) -// assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp2.Status.ErrorCode) -// -// req4 := &indexpb.GetIndexFilePathsRequest{ -// IndexBuildIDs: []UniqueID{1, 2}, -// } -// resp4, err := ic.GetIndexFilePaths(context.Background(), req4) -// assert.Nil(t, err) -// assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp4.Status.ErrorCode) -// -// req5 := &indexpb.RemoveIndexRequest{} -// resp5, err := ic.RemoveIndex(context.Background(), req5) -// assert.Nil(t, err) -// assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp5.GetErrorCode()) -//} -// -//func TestIndexCoord_GetIndexFilePaths(t *testing.T) { -// ic := &IndexCoord{ -// metaTable: &metaTable{ -// indexBuildID2Meta: map[UniqueID]*Meta{ -// 1: { -// indexMeta: &indexpb.IndexMeta{ -// IndexBuildID: 1, -// State: commonpb.IndexState_Finished, -// IndexFileKeys: []string{"indexFiles-1", "indexFiles-2"}, -// }, -// }, -// 2: { -// indexMeta: &indexpb.IndexMeta{ -// IndexBuildID: 2, -// State: commonpb.IndexState_Failed, -// }, -// }, -// }, -// }, -// } -// -// ic.stateCode.Store(commonpb.StateCode_Healthy) -// -// t.Run("GetIndexFilePaths success", func(t *testing.T) { -// resp, err := ic.GetIndexFilePaths(context.Background(), &indexpb.GetIndexFilePathsRequest{IndexBuildIDs: []UniqueID{1}}) -// assert.NoError(t, err) -// assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) -// assert.Equal(t, 1, len(resp.FilePaths)) -// assert.ElementsMatch(t, resp.FilePaths[0].IndexFileKeys, []string{"indexFiles-1", "indexFiles-2"}) -// }) -// -// t.Run("GetIndexFilePaths failed", func(t *testing.T) { -// resp, err := ic.GetIndexFilePaths(context.Background(), &indexpb.GetIndexFilePathsRequest{IndexBuildIDs: []UniqueID{2}}) -// assert.NoError(t, err) -// assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) -// assert.Equal(t, 0, len(resp.FilePaths[0].IndexFileKeys)) -// }) -// -// t.Run("set DataCoord with nil", func(t *testing.T) { -// err := ic.SetDataCoord(nil) -// assert.Error(t, err) -// }) -//} -// -//func Test_tryAcquireSegmentReferLock(t *testing.T) { -// ic := &IndexCoord{ -// session: &sessionutil.Session{ -// ServerID: 1, -// }, -// } -// dcm := &DataCoordMock{ -// Err: false, -// Fail: false, -// } -// cmm := &ChunkManagerMock{ -// Err: false, -// Fail: false, -// } -// -// ic.dataCoordClient = dcm -// ic.chunkManager = cmm -// -// t.Run("success", func(t *testing.T) { -// err := ic.tryAcquireSegmentReferLock(context.Background(), 1, 1, []UniqueID{1}) -// assert.Nil(t, err) -// }) -// -// t.Run("error", func(t *testing.T) { -// dcmE := &DataCoordMock{ -// Err: true, -// Fail: false, -// } -// ic.dataCoordClient = dcmE -// err := ic.tryAcquireSegmentReferLock(context.Background(), 1, 1, []UniqueID{1}) -// assert.Error(t, err) -// }) -// -// t.Run("Fail", func(t *testing.T) { -// dcmF := &DataCoordMock{ -// Err: false, -// Fail: true, -// } -// ic.dataCoordClient = dcmF -// err := ic.tryAcquireSegmentReferLock(context.Background(), 1, 1, []UniqueID{1}) -// assert.Error(t, err) -// }) -//} -// -//func Test_tryReleaseSegmentReferLock(t *testing.T) { -// ic := &IndexCoord{ -// session: &sessionutil.Session{ -// ServerID: 1, -// }, -// } -// dcm := &DataCoordMock{ -// Err: false, -// Fail: false, -// } -// -// ic.dataCoordClient = dcm -// -// t.Run("success", func(t *testing.T) { -// err := ic.tryReleaseSegmentReferLock(context.Background(), 1, 1) -// assert.NoError(t, err) -// }) -//} -// -//func TestIndexCoord_RemoveIndex(t *testing.T) { -// ic := &IndexCoord{ -// metaTable: &metaTable{}, -// indexBuilder: &indexBuilder{ -// notify: make(chan struct{}, 10), -// }, -// } -// ic.stateCode.Store(commonpb.StateCode_Healthy) -// status, err := ic.RemoveIndex(context.Background(), &indexpb.RemoveIndexRequest{BuildIDs: []UniqueID{0}}) -// assert.Nil(t, err) -// assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode()) -//} - func TestIndexCoord_pullSegmentInfo(t *testing.T) { t.Run("success", func(t *testing.T) { ic := &IndexCoord{ @@ -1410,3 +1123,117 @@ func TestIndexCoord_CreateIndex(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode()) }) } + +func TestIndexCoord_GetIndexState(t *testing.T) { + ic := &IndexCoord{ + metaTable: &metaTable{ + catalog: &indexcoord.Catalog{Txn: NewMockEtcdKV()}, + collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{}, + segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{}, + }, + } + ic.stateCode.Store(commonpb.StateCode_Healthy) + + req := &indexpb.GetIndexStateRequest{ + CollectionID: collID, + IndexName: "", + } + ctx := context.Background() + t.Run("no index", func(t *testing.T) { + resp, err := ic.GetIndexState(ctx, req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) + }) + + err := ic.metaTable.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: collID, + FieldID: fieldID, + IndexID: indexID, + IndexName: "_default_idx_101", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: nil, + IsAutoIndex: false, + UserIndexParams: nil, + }) + assert.NoError(t, err) + err = ic.metaTable.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: collID, + FieldID: fieldID + 1, + IndexID: indexID + 1, + IndexName: "_default_idx_102", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: nil, + IsAutoIndex: false, + UserIndexParams: nil, + }) + assert.NoError(t, err) + t.Run("multiple indexes", func(t *testing.T) { + resp, err := ic.GetIndexState(ctx, req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) + }) +} + +func TestIndexCoord_GetIndexBuildProgress(t *testing.T) { + ic := &IndexCoord{ + metaTable: &metaTable{ + catalog: &indexcoord.Catalog{Txn: NewMockEtcdKV()}, + collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{}, + segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{}, + }, + } + ic.stateCode.Store(commonpb.StateCode_Healthy) + + req := &indexpb.GetIndexBuildProgressRequest{ + CollectionID: collID, + IndexName: "", + } + ctx := context.Background() + + t.Run("no index", func(t *testing.T) { + resp, err := ic.GetIndexBuildProgress(ctx, req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) + }) + + err := ic.metaTable.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: collID, + FieldID: fieldID, + IndexID: indexID, + IndexName: "_default_idx_101", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: nil, + IsAutoIndex: false, + UserIndexParams: nil, + }) + assert.NoError(t, err) + err = ic.metaTable.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: collID, + FieldID: fieldID + 1, + IndexID: indexID + 1, + IndexName: "_default_idx_102", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: nil, + IsAutoIndex: false, + UserIndexParams: nil, + }) + assert.NoError(t, err) + + t.Run("multiple indexes", func(t *testing.T) { + resp, err := ic.GetIndexBuildProgress(ctx, req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) + }) +} diff --git a/internal/proxy/task_index.go b/internal/proxy/task_index.go index 945a965a48..f59b1b8bff 100644 --- a/internal/proxy/task_index.go +++ b/internal/proxy/task_index.go @@ -618,10 +618,6 @@ func (gibpt *getIndexBuildProgressTask) Execute(ctx context.Context) error { } gibpt.collectionID = collectionID - if gibpt.IndexName == "" { - gibpt.IndexName = Params.CommonCfg.DefaultIndexName - } - resp, err := gibpt.indexCoord.GetIndexBuildProgress(ctx, &indexpb.GetIndexBuildProgressRequest{ CollectionID: collectionID, IndexName: gibpt.IndexName, @@ -704,10 +700,6 @@ func (gist *getIndexStateTask) PreExecute(ctx context.Context) error { } func (gist *getIndexStateTask) Execute(ctx context.Context) error { - - if gist.IndexName == "" { - gist.IndexName = Params.CommonCfg.DefaultIndexName - } collectionID, err := globalMetaCache.GetCollectionID(ctx, gist.CollectionName) if err != nil { return err