fix: create multiple identical indexes by accident (#40180)

issue: https://github.com/milvus-io/milvus/issues/40163
pr: https://github.com/milvus-io/milvus/pull/40179

---------

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/40205/head pkg/v2.5.5
zhenshan.cao 2025-02-25 22:19:56 +08:00 committed by GitHub
parent eee98fd044
commit 27fb8d9512
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 128 additions and 167 deletions

View File

@ -660,13 +660,12 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
s.Run("collection has index, segment is not indexed", func() {
task := s.generateBasicTask(false)
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing))
index := &model.Index{
CollectionID: 1,
IndexID: 3,
}
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 1,
}
task.updateAndSaveTaskMeta(setResultSegments([]int64{10, 11}))
err := s.meta.indexMeta.CreateIndex(context.TODO(), index)
_, err := s.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 3)
s.NoError(err)
s.False(task.Process())
@ -676,11 +675,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
s.Run("collection has index, segment indexed", func() {
task := s.generateBasicTask(false)
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing))
index := &model.Index{
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 1,
IndexID: 3,
}
err := s.meta.indexMeta.CreateIndex(context.TODO(), index)
_, err := s.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 3)
s.NoError(err)
s.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{

View File

@ -16,6 +16,7 @@ import (
mocks2 "github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
@ -40,11 +41,11 @@ func TestGetQueryVChanPositionsRetrieveM2N(t *testing.T) {
},
},
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 1,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
require.NoError(t, err)
segArgs := []struct {
@ -152,12 +153,12 @@ func TestGetQueryVChanPositions(t *testing.T) {
},
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
assert.NoError(t, err)
s1 := &datapb.SegmentInfo{
@ -331,12 +332,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
ID: 0,
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
assert.NoError(t, err)
c := &datapb.SegmentInfo{
ID: 1,
@ -401,12 +401,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
ID: 0,
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
assert.NoError(t, err)
a := &datapb.SegmentInfo{
ID: 99,
@ -487,12 +486,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
ID: 0,
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
assert.NoError(t, err)
c := &datapb.SegmentInfo{
ID: 1,
@ -597,12 +595,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
Partitions: []int64{0},
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
assert.NoError(t, err)
seg1 := &datapb.SegmentInfo{
ID: 1,
@ -978,12 +975,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
Partitions: []int64{0},
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
assert.NoError(t, err)
seg1 := &datapb.SegmentInfo{
ID: 1,
@ -1182,12 +1178,12 @@ func TestGetCurrentSegmentsView(t *testing.T) {
Partitions: []int64{0},
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
assert.NoError(t, err)
seg1 := &datapb.SegmentInfo{
ID: 1,

View File

@ -326,10 +326,14 @@ func checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool
return !notEq
}
// CanCreateIndex currently is used in Unittest
func (m *indexMeta) CanCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, error) {
m.RLock()
defer m.RUnlock()
return m.canCreateIndex(req)
}
func (m *indexMeta) canCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, error) {
indexes, ok := m.indexes[req.CollectionID]
if !ok {
return 0, nil
@ -384,23 +388,51 @@ func (m *indexMeta) HasSameReq(req *indexpb.CreateIndexRequest) (bool, UniqueID)
return false, 0
}
func (m *indexMeta) CreateIndex(ctx context.Context, index *model.Index) error {
log.Ctx(ctx).Info("meta update: CreateIndex", zap.Int64("collectionID", index.CollectionID),
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName))
func (m *indexMeta) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest, allocatedIndexID UniqueID) (UniqueID, error) {
m.Lock()
defer m.Unlock()
indexID, err := m.canCreateIndex(req)
if err != nil {
return indexID, err
}
if indexID == 0 {
indexID = allocatedIndexID
} else {
return indexID, nil
}
// exclude the mmap.enable param, because it will be conflicted with the index's mmap.enable param
typeParams := DeleteParams(req.GetTypeParams(), []string{common.MmapEnabledKey})
index := &model.Index{
CollectionID: req.GetCollectionID(),
FieldID: req.GetFieldID(),
IndexID: indexID,
IndexName: req.GetIndexName(),
TypeParams: typeParams,
IndexParams: req.GetIndexParams(),
CreateTime: req.GetTimestamp(),
IsAutoIndex: req.GetIsAutoIndex(),
UserIndexParams: req.GetUserIndexParams(),
}
if err := ValidateIndexParams(index); err != nil {
return indexID, err
}
log.Ctx(ctx).Info("meta update: CreateIndex", zap.Int64("collectionID", index.CollectionID),
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName))
if err := m.catalog.CreateIndex(ctx, index); err != nil {
log.Ctx(ctx).Error("meta update: CreateIndex save meta fail", zap.Int64("collectionID", index.CollectionID),
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID),
zap.String("indexName", index.IndexName), zap.Error(err))
return err
return indexID, err
}
m.updateCollectionIndex(index)
log.Ctx(ctx).Info("meta update: CreateIndex success", zap.Int64("collectionID", index.CollectionID),
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName))
return nil
return indexID, nil
}
func (m *indexMeta) AlterIndex(ctx context.Context, indexes ...*model.Index) error {
@ -553,10 +585,7 @@ func (m *indexMeta) GetIndexesForCollection(collID UniqueID, indexName string) [
return indexInfos
}
func (m *indexMeta) GetFieldIndexes(collID, fieldID UniqueID, indexName string) []*model.Index {
m.RLock()
defer m.RUnlock()
func (m *indexMeta) getFieldIndexes(collID, fieldID UniqueID, indexName string) []*model.Index {
indexInfos := make([]*model.Index, 0)
for _, index := range m.indexes[collID] {
if index.IsDeleted || index.FieldID != fieldID {
@ -569,6 +598,12 @@ func (m *indexMeta) GetFieldIndexes(collID, fieldID UniqueID, indexName string)
return indexInfos
}
func (m *indexMeta) GetFieldIndexes(collID, fieldID UniqueID, indexName string) []*model.Index {
m.RLock()
defer m.RUnlock()
return m.getFieldIndexes(collID, fieldID, indexName)
}
// MarkIndexAsDeleted will mark the corresponding index as deleted, and recycleUnusedIndexFiles will recycle these tasks.
func (m *indexMeta) MarkIndexAsDeleted(ctx context.Context, collID UniqueID, indexIDs []UniqueID) error {
log.Ctx(ctx).Info("IndexCoord metaTable MarkIndexAsDeleted", zap.Int64("collectionID", collID),

View File

@ -269,21 +269,8 @@ func TestMeta_CanCreateIndex(t *testing.T) {
tmpIndexID, err := m.CanCreateIndex(req)
assert.NoError(t, err)
assert.Equal(t, int64(0), tmpIndexID)
index := &model.Index{
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 0,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: userIndexParams,
}
err = m.CreateIndex(context.TODO(), index)
_, err = m.CreateIndex(context.TODO(), req, indexID)
assert.NoError(t, err)
tmpIndexID, err = m.CanCreateIndex(req)
@ -458,21 +445,21 @@ func TestMeta_CreateIndex(t *testing.T) {
Value: "FLAT",
},
}
index := &model.Index{
TenantID: "",
CollectionID: 1,
FieldID: 2,
IndexID: 3,
IndexName: "_default_idx",
IsDeleted: false,
CreateTime: 12,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
typeParams := []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
}
req := &indexpb.CreateIndexRequest{
CollectionID: 1,
FieldID: 2,
IndexName: indexName,
TypeParams: typeParams,
IndexParams: indexParams,
Timestamp: 12,
IsAutoIndex: false,
UserIndexParams: indexParams,
}
@ -485,7 +472,7 @@ func TestMeta_CreateIndex(t *testing.T) {
).Return(nil)
m := newSegmentIndexMeta(sc)
err := m.CreateIndex(context.TODO(), index)
_, err := m.CreateIndex(context.TODO(), req, 3)
assert.NoError(t, err)
})
@ -497,7 +484,7 @@ func TestMeta_CreateIndex(t *testing.T) {
).Return(errors.New("fail"))
m := newSegmentIndexMeta(ec)
err := m.CreateIndex(context.TODO(), index)
_, err := m.CreateIndex(context.TODO(), req, 4)
assert.Error(t, err)
})
}

View File

@ -28,7 +28,6 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/internal/util/vecindexmgr"
"github.com/milvus-io/milvus/pkg/v2/common"
pkgcommon "github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
@ -215,58 +214,24 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
}
}
indexID, err := s.meta.indexMeta.CanCreateIndex(req)
if err != nil {
if vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(GetIndexType(req.IndexParams)) && !s.indexNodeManager.ClientSupportDisk() {
errMsg := "all IndexNodes do not support disk indexes, please verify"
log.Warn(errMsg)
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
return merr.Status(merr.WrapErrIndexNotSupported(GetIndexType(req.IndexParams))), nil
}
// merge with previous params because create index would not pass mmap params
indexes := s.meta.indexMeta.GetFieldIndexes(req.GetCollectionID(), req.GetFieldID(), req.GetIndexName())
if len(indexes) == 1 {
req.UserIndexParams = UpdateParams(indexes[0], indexes[0].UserIndexParams, req.GetUserIndexParams())
req.IndexParams = UpdateParams(indexes[0], indexes[0].IndexParams, req.GetIndexParams())
}
if indexID == 0 {
indexID, err = s.allocator.AllocID(ctx)
if err != nil {
log.Warn("failed to alloc indexID", zap.Error(err))
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
}
if vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(GetIndexType(req.IndexParams)) && !s.indexNodeManager.ClientSupportDisk() {
errMsg := "all IndexNodes do not support disk indexes, please verify"
log.Warn(errMsg)
err = merr.WrapErrIndexNotSupported(GetIndexType(req.IndexParams))
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
}
}
// exclude the mmap.enable param, because it will be conflict with the index's mmap.enable param
typeParams := DeleteParams(req.GetTypeParams(), []string{common.MmapEnabledKey})
index := &model.Index{
CollectionID: req.GetCollectionID(),
FieldID: req.GetFieldID(),
IndexID: indexID,
IndexName: req.GetIndexName(),
TypeParams: typeParams,
IndexParams: req.GetIndexParams(),
CreateTime: req.GetTimestamp(),
IsAutoIndex: req.GetIsAutoIndex(),
UserIndexParams: req.GetUserIndexParams(),
}
if err := ValidateIndexParams(index); err != nil {
allocatedIndexID, err := s.allocator.AllocID(ctx)
if err != nil {
log.Warn("failed to alloc indexID", zap.Error(err))
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
}
// Get flushed segments and create index
err = s.meta.indexMeta.CreateIndex(ctx, index)
indexID, err := s.meta.indexMeta.CreateIndex(ctx, req, allocatedIndexID)
if err != nil {
log.Error("CreateIndex fail",
log.Warn("CreateIndex fail",
zap.Int64("fieldID", req.GetFieldID()), zap.String("indexName", req.GetIndexName()), zap.Error(err))
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil

View File

@ -55,6 +55,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
@ -1230,13 +1231,11 @@ func TestGetRecoveryInfo(t *testing.T) {
})
assert.NoError(t, err)
err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 0,
IndexName: "",
})
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0)
assert.NoError(t, err)
seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed)
@ -1453,13 +1452,11 @@ func TestGetRecoveryInfo(t *testing.T) {
err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment))
assert.NoError(t, err)
err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 0,
IndexName: "",
})
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0)
assert.NoError(t, err)
err = svr.meta.indexMeta.AddSegmentIndex(context.TODO(), &model.SegmentIndex{
SegmentID: segment.ID,
@ -1618,19 +1615,12 @@ func TestGetRecoveryInfo(t *testing.T) {
assert.NoError(t, err)
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5))
assert.NoError(t, err)
err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
IndexID: 0,
IndexName: "_default_idx_2",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
})
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexName: "_default_idx_2",
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0)
assert.NoError(t, err)
svr.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{
SegmentID: seg4.ID,

View File

@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
@ -852,13 +853,11 @@ func TestGetRecoveryInfoV2(t *testing.T) {
})
assert.NoError(t, err)
err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 0,
IndexName: "",
})
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0)
assert.NoError(t, err)
seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed)
@ -1077,13 +1076,11 @@ func TestGetRecoveryInfoV2(t *testing.T) {
err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment))
assert.NoError(t, err)
err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 0,
IndexName: "",
})
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0)
assert.NoError(t, err)
err = svr.meta.indexMeta.AddSegmentIndex(context.TODO(), &model.SegmentIndex{
SegmentID: segment.ID,
@ -1243,19 +1240,12 @@ func TestGetRecoveryInfoV2(t *testing.T) {
assert.NoError(t, err)
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5))
assert.NoError(t, err)
err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
IndexID: 0,
IndexName: "_default_idx_2",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
})
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexName: "_default_idx_2",
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0)
assert.NoError(t, err)
svr.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{
SegmentID: seg4.ID,