diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index a5ff710ada..593ce1c0f7 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -660,13 +660,12 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() { s.Run("collection has index, segment is not indexed", func() { task := s.generateBasicTask(false) task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing)) - index := &model.Index{ - CollectionID: 1, - IndexID: 3, - } + indexReq := &indexpb.CreateIndexRequest{ + CollectionID: 1, + } task.updateAndSaveTaskMeta(setResultSegments([]int64{10, 11})) - err := s.meta.indexMeta.CreateIndex(context.TODO(), index) + _, err := s.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 3) s.NoError(err) s.False(task.Process()) @@ -676,11 +675,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() { s.Run("collection has index, segment indexed", func() { task := s.generateBasicTask(false) task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing)) - index := &model.Index{ + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 1, - IndexID: 3, } - err := s.meta.indexMeta.CreateIndex(context.TODO(), index) + _, err := s.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 3) s.NoError(err) s.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{ diff --git a/internal/datacoord/handler_test.go b/internal/datacoord/handler_test.go index 3aaf7a78f4..1629c8893e 100644 --- a/internal/datacoord/handler_test.go +++ b/internal/datacoord/handler_test.go @@ -16,6 +16,7 @@ import ( mocks2 "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/v2/proto/workerpb" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" @@ -40,11 +41,11 @@ func TestGetQueryVChanPositionsRetrieveM2N(t *testing.T) { }, }, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 1, FieldID: 2, - IndexID: 1, - }) + } + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1) require.NoError(t, err) segArgs := []struct { @@ -152,12 +153,12 @@ func TestGetQueryVChanPositions(t *testing.T) { }, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 1, - }) + } + + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1) assert.NoError(t, err) s1 := &datapb.SegmentInfo{ @@ -331,12 +332,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { ID: 0, Schema: schema, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 1, - }) + } + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1) assert.NoError(t, err) c := &datapb.SegmentInfo{ ID: 1, @@ -401,12 +401,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { ID: 0, Schema: schema, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 1, - }) + } + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1) assert.NoError(t, err) a := &datapb.SegmentInfo{ ID: 99, @@ -487,12 +486,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { ID: 0, Schema: schema, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 1, - }) + } + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1) assert.NoError(t, err) c := &datapb.SegmentInfo{ ID: 1, @@ -597,12 +595,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { Partitions: []int64{0}, Schema: schema, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 1, - }) + } + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1) assert.NoError(t, err) seg1 := &datapb.SegmentInfo{ ID: 1, @@ -978,12 +975,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { Partitions: []int64{0}, Schema: schema, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 1, - }) + } + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1) assert.NoError(t, err) seg1 := &datapb.SegmentInfo{ ID: 1, @@ -1182,12 +1178,12 @@ func TestGetCurrentSegmentsView(t *testing.T) { Partitions: []int64{0}, Schema: schema, }) - err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 1, - }) + } + _, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1) assert.NoError(t, err) seg1 := &datapb.SegmentInfo{ ID: 1, diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index 6762bee360..33f075747b 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -326,10 +326,14 @@ func checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool return !notEq } +// CanCreateIndex currently is used in Unittest func (m *indexMeta) CanCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, error) { m.RLock() defer m.RUnlock() + return m.canCreateIndex(req) +} +func (m *indexMeta) canCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, error) { indexes, ok := m.indexes[req.CollectionID] if !ok { return 0, nil @@ -384,23 +388,51 @@ func (m *indexMeta) HasSameReq(req *indexpb.CreateIndexRequest) (bool, UniqueID) return false, 0 } -func (m *indexMeta) CreateIndex(ctx context.Context, index *model.Index) error { - log.Ctx(ctx).Info("meta update: CreateIndex", zap.Int64("collectionID", index.CollectionID), - zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName)) +func (m *indexMeta) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest, allocatedIndexID UniqueID) (UniqueID, error) { m.Lock() defer m.Unlock() + indexID, err := m.canCreateIndex(req) + if err != nil { + return indexID, err + } + + if indexID == 0 { + indexID = allocatedIndexID + } else { + return indexID, nil + } + + // exclude the mmap.enable param, because it will be conflicted with the index's mmap.enable param + typeParams := DeleteParams(req.GetTypeParams(), []string{common.MmapEnabledKey}) + index := &model.Index{ + CollectionID: req.GetCollectionID(), + FieldID: req.GetFieldID(), + IndexID: indexID, + IndexName: req.GetIndexName(), + TypeParams: typeParams, + IndexParams: req.GetIndexParams(), + CreateTime: req.GetTimestamp(), + IsAutoIndex: req.GetIsAutoIndex(), + UserIndexParams: req.GetUserIndexParams(), + } + if err := ValidateIndexParams(index); err != nil { + return indexID, err + } + log.Ctx(ctx).Info("meta update: CreateIndex", zap.Int64("collectionID", index.CollectionID), + zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName)) + if err := m.catalog.CreateIndex(ctx, index); err != nil { log.Ctx(ctx).Error("meta update: CreateIndex save meta fail", zap.Int64("collectionID", index.CollectionID), zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName), zap.Error(err)) - return err + return indexID, err } m.updateCollectionIndex(index) log.Ctx(ctx).Info("meta update: CreateIndex success", zap.Int64("collectionID", index.CollectionID), zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName)) - return nil + return indexID, nil } func (m *indexMeta) AlterIndex(ctx context.Context, indexes ...*model.Index) error { @@ -553,10 +585,7 @@ func (m *indexMeta) GetIndexesForCollection(collID UniqueID, indexName string) [ return indexInfos } -func (m *indexMeta) GetFieldIndexes(collID, fieldID UniqueID, indexName string) []*model.Index { - m.RLock() - defer m.RUnlock() - +func (m *indexMeta) getFieldIndexes(collID, fieldID UniqueID, indexName string) []*model.Index { indexInfos := make([]*model.Index, 0) for _, index := range m.indexes[collID] { if index.IsDeleted || index.FieldID != fieldID { @@ -569,6 +598,12 @@ func (m *indexMeta) GetFieldIndexes(collID, fieldID UniqueID, indexName string) return indexInfos } +func (m *indexMeta) GetFieldIndexes(collID, fieldID UniqueID, indexName string) []*model.Index { + m.RLock() + defer m.RUnlock() + return m.getFieldIndexes(collID, fieldID, indexName) +} + // MarkIndexAsDeleted will mark the corresponding index as deleted, and recycleUnusedIndexFiles will recycle these tasks. func (m *indexMeta) MarkIndexAsDeleted(ctx context.Context, collID UniqueID, indexIDs []UniqueID) error { log.Ctx(ctx).Info("IndexCoord metaTable MarkIndexAsDeleted", zap.Int64("collectionID", collID), diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index d8ffca36b8..9f46560eb2 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -269,21 +269,8 @@ func TestMeta_CanCreateIndex(t *testing.T) { tmpIndexID, err := m.CanCreateIndex(req) assert.NoError(t, err) assert.Equal(t, int64(0), tmpIndexID) - index := &model.Index{ - TenantID: "", - CollectionID: collID, - FieldID: fieldID, - IndexID: indexID, - IndexName: indexName, - IsDeleted: false, - CreateTime: 0, - TypeParams: typeParams, - IndexParams: indexParams, - IsAutoIndex: false, - UserIndexParams: userIndexParams, - } - err = m.CreateIndex(context.TODO(), index) + _, err = m.CreateIndex(context.TODO(), req, indexID) assert.NoError(t, err) tmpIndexID, err = m.CanCreateIndex(req) @@ -458,21 +445,21 @@ func TestMeta_CreateIndex(t *testing.T) { Value: "FLAT", }, } - index := &model.Index{ - TenantID: "", - CollectionID: 1, - FieldID: 2, - IndexID: 3, - IndexName: "_default_idx", - IsDeleted: false, - CreateTime: 12, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128", - }, + + typeParams := []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", }, + } + + req := &indexpb.CreateIndexRequest{ + CollectionID: 1, + FieldID: 2, + IndexName: indexName, + TypeParams: typeParams, IndexParams: indexParams, + Timestamp: 12, IsAutoIndex: false, UserIndexParams: indexParams, } @@ -485,7 +472,7 @@ func TestMeta_CreateIndex(t *testing.T) { ).Return(nil) m := newSegmentIndexMeta(sc) - err := m.CreateIndex(context.TODO(), index) + _, err := m.CreateIndex(context.TODO(), req, 3) assert.NoError(t, err) }) @@ -497,7 +484,7 @@ func TestMeta_CreateIndex(t *testing.T) { ).Return(errors.New("fail")) m := newSegmentIndexMeta(ec) - err := m.CreateIndex(context.TODO(), index) + _, err := m.CreateIndex(context.TODO(), req, 4) assert.Error(t, err) }) } diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 66d2652409..dbeda871f6 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -28,7 +28,6 @@ import ( "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/util/indexparamcheck" "github.com/milvus-io/milvus/internal/util/vecindexmgr" - "github.com/milvus-io/milvus/pkg/v2/common" pkgcommon "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" @@ -215,58 +214,24 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques } } - indexID, err := s.meta.indexMeta.CanCreateIndex(req) - if err != nil { + if vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(GetIndexType(req.IndexParams)) && !s.indexNodeManager.ClientSupportDisk() { + errMsg := "all IndexNodes do not support disk indexes, please verify" + log.Warn(errMsg) metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() - return merr.Status(err), nil + return merr.Status(merr.WrapErrIndexNotSupported(GetIndexType(req.IndexParams))), nil } - // merge with previous params because create index would not pass mmap params - indexes := s.meta.indexMeta.GetFieldIndexes(req.GetCollectionID(), req.GetFieldID(), req.GetIndexName()) - if len(indexes) == 1 { - req.UserIndexParams = UpdateParams(indexes[0], indexes[0].UserIndexParams, req.GetUserIndexParams()) - req.IndexParams = UpdateParams(indexes[0], indexes[0].IndexParams, req.GetIndexParams()) - } - - if indexID == 0 { - indexID, err = s.allocator.AllocID(ctx) - if err != nil { - log.Warn("failed to alloc indexID", zap.Error(err)) - metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() - return merr.Status(err), nil - } - if vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(GetIndexType(req.IndexParams)) && !s.indexNodeManager.ClientSupportDisk() { - errMsg := "all IndexNodes do not support disk indexes, please verify" - log.Warn(errMsg) - err = merr.WrapErrIndexNotSupported(GetIndexType(req.IndexParams)) - metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() - return merr.Status(err), nil - } - } - // exclude the mmap.enable param, because it will be conflict with the index's mmap.enable param - typeParams := DeleteParams(req.GetTypeParams(), []string{common.MmapEnabledKey}) - - index := &model.Index{ - CollectionID: req.GetCollectionID(), - FieldID: req.GetFieldID(), - IndexID: indexID, - IndexName: req.GetIndexName(), - TypeParams: typeParams, - IndexParams: req.GetIndexParams(), - CreateTime: req.GetTimestamp(), - IsAutoIndex: req.GetIsAutoIndex(), - UserIndexParams: req.GetUserIndexParams(), - } - - if err := ValidateIndexParams(index); err != nil { + allocatedIndexID, err := s.allocator.AllocID(ctx) + if err != nil { + log.Warn("failed to alloc indexID", zap.Error(err)) metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() return merr.Status(err), nil } // Get flushed segments and create index - err = s.meta.indexMeta.CreateIndex(ctx, index) + indexID, err := s.meta.indexMeta.CreateIndex(ctx, req, allocatedIndexID) if err != nil { - log.Error("CreateIndex fail", + log.Warn("CreateIndex fail", zap.Int64("fieldID", req.GetFieldID()), zap.String("indexName", req.GetIndexName()), zap.Error(err)) metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() return merr.Status(err), nil diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 5ae91b005a..4f031ff32c 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -55,6 +55,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/proto/workerpb" "github.com/milvus-io/milvus/pkg/v2/util/etcd" @@ -1230,13 +1231,11 @@ func TestGetRecoveryInfo(t *testing.T) { }) assert.NoError(t, err) - err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 0, - IndexName: "", - }) + } + _, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0) assert.NoError(t, err) seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed) @@ -1453,13 +1452,11 @@ func TestGetRecoveryInfo(t *testing.T) { err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment)) assert.NoError(t, err) - err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 0, - IndexName: "", - }) + } + _, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0) assert.NoError(t, err) err = svr.meta.indexMeta.AddSegmentIndex(context.TODO(), &model.SegmentIndex{ SegmentID: segment.ID, @@ -1618,19 +1615,12 @@ func TestGetRecoveryInfo(t *testing.T) { assert.NoError(t, err) err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5)) assert.NoError(t, err) - err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", - CollectionID: 0, - FieldID: 2, - IndexID: 0, - IndexName: "_default_idx_2", - IsDeleted: false, - CreateTime: 0, - TypeParams: nil, - IndexParams: nil, - IsAutoIndex: false, - UserIndexParams: nil, - }) + indexReq := &indexpb.CreateIndexRequest{ + CollectionID: 0, + FieldID: 2, + IndexName: "_default_idx_2", + } + _, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0) assert.NoError(t, err) svr.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{ SegmentID: seg4.ID, diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index c40a275cbb..e2ec8c8037 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/proto/workerpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" @@ -852,13 +853,11 @@ func TestGetRecoveryInfoV2(t *testing.T) { }) assert.NoError(t, err) - err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 0, - IndexName: "", - }) + } + _, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0) assert.NoError(t, err) seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed) @@ -1077,13 +1076,11 @@ func TestGetRecoveryInfoV2(t *testing.T) { err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment)) assert.NoError(t, err) - err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", + indexReq := &indexpb.CreateIndexRequest{ CollectionID: 0, FieldID: 2, - IndexID: 0, - IndexName: "", - }) + } + _, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0) assert.NoError(t, err) err = svr.meta.indexMeta.AddSegmentIndex(context.TODO(), &model.SegmentIndex{ SegmentID: segment.ID, @@ -1243,19 +1240,12 @@ func TestGetRecoveryInfoV2(t *testing.T) { assert.NoError(t, err) err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5)) assert.NoError(t, err) - err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ - TenantID: "", - CollectionID: 0, - FieldID: 2, - IndexID: 0, - IndexName: "_default_idx_2", - IsDeleted: false, - CreateTime: 0, - TypeParams: nil, - IndexParams: nil, - IsAutoIndex: false, - UserIndexParams: nil, - }) + indexReq := &indexpb.CreateIndexRequest{ + CollectionID: 0, + FieldID: 2, + IndexName: "_default_idx_2", + } + _, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0) assert.NoError(t, err) svr.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{ SegmentID: seg4.ID,