fix: create multiple idential indexes by accident (#40179)

issue: https://github.com/milvus-io/milvus/issues/40163

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/41097/head
zhenshan.cao 2025-04-08 15:06:25 +08:00 committed by GitHub
parent 6f17720e4e
commit 758cf29e77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 121 additions and 157 deletions

2
go.mod
View File

@ -18,7 +18,7 @@ require (
github.com/gin-gonic/gin v1.9.1
github.com/go-playground/validator/v10 v10.14.0
github.com/gofrs/flock v0.8.1
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/protobuf v1.5.4
github.com/google/btree v1.1.2
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9

View File

@ -661,13 +661,12 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
s.Run("collection has index, segment is not indexed", func() {
task := s.generateBasicTask(false)
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing))
index := &model.Index{
CollectionID: 1,
IndexID: 3,
}
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 1,
}
task.updateAndSaveTaskMeta(setResultSegments([]int64{10, 11}))
err := s.meta.indexMeta.CreateIndex(context.TODO(), index)
_, err := s.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 3, false)
s.NoError(err)
s.False(task.Process())
@ -677,11 +676,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
s.Run("collection has index, segment indexed", func() {
task := s.generateBasicTask(false)
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing))
index := &model.Index{
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 1,
IndexID: 3,
}
err := s.meta.indexMeta.CreateIndex(context.TODO(), index)
_, err := s.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 3, false)
s.NoError(err)
s.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{

View File

@ -16,6 +16,7 @@ import (
mocks2 "github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
@ -40,11 +41,11 @@ func TestGetQueryVChanPositionsRetrieveM2N(t *testing.T) {
},
},
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 1,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
require.NoError(t, err)
segArgs := []struct {
@ -152,12 +153,12 @@ func TestGetQueryVChanPositions(t *testing.T) {
},
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
assert.NoError(t, err)
s1 := &datapb.SegmentInfo{
@ -333,12 +334,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
ID: 0,
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
assert.NoError(t, err)
c := &datapb.SegmentInfo{
ID: 1,
@ -403,12 +403,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
ID: 0,
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
assert.NoError(t, err)
a := &datapb.SegmentInfo{
ID: 99,
@ -489,12 +488,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
ID: 0,
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
assert.NoError(t, err)
c := &datapb.SegmentInfo{
ID: 1,
@ -599,12 +597,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
Partitions: []int64{0},
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
assert.NoError(t, err)
seg1 := &datapb.SegmentInfo{
ID: 1,
@ -980,12 +977,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
Partitions: []int64{0},
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
assert.NoError(t, err)
seg1 := &datapb.SegmentInfo{
ID: 1,
@ -1184,12 +1180,12 @@ func TestGetCurrentSegmentsView(t *testing.T) {
Partitions: []int64{0},
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
assert.NoError(t, err)
seg1 := &datapb.SegmentInfo{
ID: 1,

View File

@ -356,7 +356,10 @@ func checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool
func (m *indexMeta) CanCreateIndex(req *indexpb.CreateIndexRequest, isJson bool) (UniqueID, error) {
m.fieldIndexLock.RLock()
defer m.fieldIndexLock.RUnlock()
return m.canCreateIndex(req, isJson)
}
func (m *indexMeta) canCreateIndex(req *indexpb.CreateIndexRequest, isJson bool) (UniqueID, error) {
indexes, ok := m.indexes[req.CollectionID]
if !ok {
return 0, nil
@ -425,23 +428,51 @@ func (m *indexMeta) HasSameReq(req *indexpb.CreateIndexRequest) (bool, UniqueID)
return false, 0
}
func (m *indexMeta) CreateIndex(ctx context.Context, index *model.Index) error {
log.Ctx(ctx).Info("meta update: CreateIndex", zap.Int64("collectionID", index.CollectionID),
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName))
func (m *indexMeta) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest, allocatedIndexID typeutil.UniqueID, isJson bool) (UniqueID, error) {
m.fieldIndexLock.Lock()
defer m.fieldIndexLock.Unlock()
indexID, err := m.canCreateIndex(req, isJson)
if err != nil {
return indexID, err
}
if indexID == 0 {
indexID = allocatedIndexID
} else {
return indexID, nil
}
// exclude the mmap.enable param, because it will be conflicted with the index's mmap.enable param
typeParams := DeleteParams(req.GetTypeParams(), []string{common.MmapEnabledKey})
index := &model.Index{
CollectionID: req.GetCollectionID(),
FieldID: req.GetFieldID(),
IndexID: indexID,
IndexName: req.GetIndexName(),
TypeParams: typeParams,
IndexParams: req.GetIndexParams(),
CreateTime: req.GetTimestamp(),
IsAutoIndex: req.GetIsAutoIndex(),
UserIndexParams: req.GetUserIndexParams(),
}
if err := ValidateIndexParams(index); err != nil {
return indexID, err
}
log.Ctx(ctx).Info("meta update: CreateIndex", zap.Int64("collectionID", index.CollectionID),
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName))
if err := m.catalog.CreateIndex(ctx, index); err != nil {
log.Ctx(ctx).Error("meta update: CreateIndex save meta fail", zap.Int64("collectionID", index.CollectionID),
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID),
zap.String("indexName", index.IndexName), zap.Error(err))
return err
return indexID, err
}
m.updateCollectionIndex(index)
log.Ctx(ctx).Info("meta update: CreateIndex success", zap.Int64("collectionID", index.CollectionID),
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName))
return nil
return indexID, nil
}
func (m *indexMeta) AlterIndex(ctx context.Context, indexes ...*model.Index) error {

View File

@ -270,21 +270,8 @@ func TestMeta_CanCreateIndex(t *testing.T) {
tmpIndexID, err := m.CanCreateIndex(req, false)
assert.NoError(t, err)
assert.Equal(t, int64(0), tmpIndexID)
index := &model.Index{
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 0,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: userIndexParams,
}
err = m.CreateIndex(context.TODO(), index)
indexID, err = m.CreateIndex(context.TODO(), req, indexID, false)
assert.NoError(t, err)
tmpIndexID, err = m.CanCreateIndex(req, false)
@ -453,31 +440,34 @@ func newSegmentIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta {
}
func TestMeta_CreateIndex(t *testing.T) {
indexParams := []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "FLAT",
},
}
index := &model.Index{
TenantID: "",
CollectionID: 1,
FieldID: 2,
IndexID: 3,
IndexName: "_default_idx",
IsDeleted: false,
CreateTime: 12,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
typeParams := []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
}
req := &indexpb.CreateIndexRequest{
CollectionID: 1,
FieldID: 2,
IndexName: indexName,
TypeParams: typeParams,
IndexParams: indexParams,
Timestamp: 12,
IsAutoIndex: false,
UserIndexParams: indexParams,
}
allocatedID := UniqueID(3)
t.Run("success", func(t *testing.T) {
sc := catalogmocks.NewDataCoordCatalog(t)
sc.On("CreateIndex",
@ -486,7 +476,7 @@ func TestMeta_CreateIndex(t *testing.T) {
).Return(nil)
m := newSegmentIndexMeta(sc)
err := m.CreateIndex(context.TODO(), index)
_, err := m.CreateIndex(context.TODO(), req, allocatedID, false)
assert.NoError(t, err)
})
@ -498,7 +488,7 @@ func TestMeta_CreateIndex(t *testing.T) {
).Return(errors.New("fail"))
m := newSegmentIndexMeta(ec)
err := m.CreateIndex(context.TODO(), index)
_, err := m.CreateIndex(context.TODO(), req, 4, false)
assert.Error(t, err)
})
}

View File

@ -319,49 +319,15 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
}
}
indexID, err := s.meta.indexMeta.CanCreateIndex(req, isJson)
allocateIndexID, err := s.allocator.AllocID(ctx)
if err != nil {
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
}
// merge with previous params because create index would not pass mmap params
indexes := s.meta.indexMeta.GetFieldIndexes(req.GetCollectionID(), req.GetFieldID(), req.GetIndexName())
if len(indexes) == 1 {
req.UserIndexParams = UpdateParams(indexes[0], indexes[0].UserIndexParams, req.GetUserIndexParams())
req.IndexParams = UpdateParams(indexes[0], indexes[0].IndexParams, req.GetIndexParams())
}
if indexID == 0 {
indexID, err = s.allocator.AllocID(ctx)
if err != nil {
log.Warn("failed to alloc indexID", zap.Error(err))
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
}
}
// exclude the mmap.enable param, because it will be conflict with the index's mmap.enable param
typeParams := DeleteParams(req.GetTypeParams(), []string{common.MmapEnabledKey})
index := &model.Index{
CollectionID: req.GetCollectionID(),
FieldID: req.GetFieldID(),
IndexID: indexID,
IndexName: req.GetIndexName(),
TypeParams: typeParams,
IndexParams: req.GetIndexParams(),
CreateTime: req.GetTimestamp(),
IsAutoIndex: req.GetIsAutoIndex(),
UserIndexParams: req.GetUserIndexParams(),
}
if err := ValidateIndexParams(index); err != nil {
log.Warn("failed to alloc indexID", zap.Error(err))
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
}
// Get flushed segments and create index
err = s.meta.indexMeta.CreateIndex(ctx, index)
indexID, err := s.meta.indexMeta.CreateIndex(ctx, req, allocateIndexID, isJson)
if err != nil {
log.Error("CreateIndex fail",
zap.Int64("fieldID", req.GetFieldID()), zap.String("indexName", req.GetIndexName()), zap.Error(err))

View File

@ -55,6 +55,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
@ -1230,13 +1231,12 @@ func TestGetRecoveryInfo(t *testing.T) {
})
assert.NoError(t, err)
err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 0,
IndexName: "",
})
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false)
assert.NoError(t, err)
seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed)
@ -1453,13 +1453,12 @@ func TestGetRecoveryInfo(t *testing.T) {
err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment))
assert.NoError(t, err)
err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 0,
IndexName: "",
})
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false)
assert.NoError(t, err)
err = svr.meta.indexMeta.AddSegmentIndex(context.TODO(), &model.SegmentIndex{
SegmentID: segment.ID,
@ -1618,19 +1617,12 @@ func TestGetRecoveryInfo(t *testing.T) {
assert.NoError(t, err)
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5))
assert.NoError(t, err)
err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
IndexID: 0,
IndexName: "_default_idx_2",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
})
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexName: "_default_idx_2",
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false)
assert.NoError(t, err)
svr.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{
SegmentID: seg4.ID,

View File

@ -2,6 +2,7 @@ package datacoord
import (
"context"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"testing"
"time"
@ -867,13 +868,11 @@ func TestGetRecoveryInfoV2(t *testing.T) {
})
assert.NoError(t, err)
err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 0,
IndexName: "",
})
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false)
assert.NoError(t, err)
seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed)
@ -1092,13 +1091,12 @@ func TestGetRecoveryInfoV2(t *testing.T) {
err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment))
assert.NoError(t, err)
err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexID: 0,
IndexName: "",
})
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false)
assert.NoError(t, err)
err = svr.meta.indexMeta.AddSegmentIndex(context.TODO(), &model.SegmentIndex{
SegmentID: segment.ID,
@ -1258,19 +1256,12 @@ func TestGetRecoveryInfoV2(t *testing.T) {
assert.NoError(t, err)
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5))
assert.NoError(t, err)
err = svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
IndexID: 0,
IndexName: "_default_idx_2",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
})
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 0,
FieldID: 2,
IndexName: "_default_idx_2",
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false)
assert.NoError(t, err)
svr.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{
SegmentID: seg4.ID,