mirror of https://github.com/milvus-io/milvus.git
fix: Set the default index name to the name of the existing index (#29275)
issue: #29269 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/29356/head
parent
7d32861d53
commit
1b4d2674b3
|
@ -345,6 +345,22 @@ func (m *meta) GetIndexesForCollection(collID UniqueID, indexName string) []*mod
|
|||
return indexInfos
|
||||
}
|
||||
|
||||
func (m *meta) GetFieldIndexes(collID, fieldID UniqueID, indexName string) []*model.Index {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
indexInfos := make([]*model.Index, 0)
|
||||
for _, index := range m.indexes[collID] {
|
||||
if index.IsDeleted || index.FieldID != fieldID {
|
||||
continue
|
||||
}
|
||||
if indexName == "" || indexName == index.IndexName {
|
||||
indexInfos = append(indexInfos, model.CloneIndex(index))
|
||||
}
|
||||
}
|
||||
return indexInfos
|
||||
}
|
||||
|
||||
// MarkIndexAsDeleted will mark the corresponding index as deleted, and recycleUnusedIndexFiles will recycle these tasks.
|
||||
func (m *meta) MarkIndexAsDeleted(collID UniqueID, indexIDs []UniqueID) error {
|
||||
log.Info("IndexCoord metaTable MarkIndexAsDeleted", zap.Int64("collectionID", collID),
|
||||
|
|
|
@ -1284,3 +1284,55 @@ func TestMeta_DeleteTask_Error(t *testing.T) {
|
|||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestMeta_GetFieldIndexes(t *testing.T) {
|
||||
m := &meta{
|
||||
indexes: map[UniqueID]map[UniqueID]*model.Index{
|
||||
collID: {
|
||||
indexID: {
|
||||
TenantID: "",
|
||||
CollectionID: collID,
|
||||
FieldID: fieldID,
|
||||
IndexID: indexID,
|
||||
IndexName: indexName,
|
||||
IsDeleted: true,
|
||||
CreateTime: 0,
|
||||
TypeParams: nil,
|
||||
IndexParams: nil,
|
||||
IsAutoIndex: false,
|
||||
UserIndexParams: nil,
|
||||
},
|
||||
indexID + 1: {
|
||||
TenantID: "",
|
||||
CollectionID: collID,
|
||||
FieldID: fieldID,
|
||||
IndexID: indexID + 1,
|
||||
IndexName: indexName,
|
||||
IsDeleted: false,
|
||||
CreateTime: 0,
|
||||
TypeParams: nil,
|
||||
IndexParams: nil,
|
||||
IsAutoIndex: false,
|
||||
UserIndexParams: nil,
|
||||
},
|
||||
indexID + 2: {
|
||||
TenantID: "",
|
||||
CollectionID: collID,
|
||||
FieldID: fieldID + 2,
|
||||
IndexID: indexID + 2,
|
||||
IndexName: indexName + "2",
|
||||
IsDeleted: false,
|
||||
CreateTime: 0,
|
||||
TypeParams: nil,
|
||||
IndexParams: nil,
|
||||
IsAutoIndex: false,
|
||||
UserIndexParams: nil,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
indexes := m.GetFieldIndexes(collID, fieldID, "")
|
||||
assert.Equal(t, 1, len(indexes))
|
||||
assert.Equal(t, indexName, indexes[0].IndexName)
|
||||
}
|
||||
|
|
|
@ -132,6 +132,20 @@ func (s *Server) createIndexForSegmentLoop(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Server) getFieldNameByID(ctx context.Context, collID, fieldID int64) (string, error) {
|
||||
resp, err := s.broker.DescribeCollectionInternal(ctx, collID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
for _, field := range resp.GetSchema().GetFields() {
|
||||
if field.FieldID == fieldID {
|
||||
return field.Name, nil
|
||||
}
|
||||
}
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// CreateIndex create an index on collection.
|
||||
// Index building is asynchronous, so when an index building request comes, an IndexID is assigned to the task and
|
||||
// will get all flushed segments from DataCoord and record tasks with these segments. The background process
|
||||
|
@ -152,6 +166,20 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
|
|||
}
|
||||
metrics.IndexRequestCounter.WithLabelValues(metrics.TotalLabel).Inc()
|
||||
|
||||
if req.GetIndexName() == "" {
|
||||
indexes := s.meta.GetFieldIndexes(req.GetCollectionID(), req.GetFieldID(), req.GetIndexName())
|
||||
if len(indexes) == 0 {
|
||||
fieldName, err := s.getFieldNameByID(ctx, req.GetCollectionID(), req.GetFieldID())
|
||||
if err != nil {
|
||||
log.Warn("get field name from schema failed", zap.Int64("fieldID", req.GetFieldID()))
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
req.IndexName = fieldName
|
||||
} else if len(indexes) == 1 {
|
||||
req.IndexName = indexes[0].IndexName
|
||||
}
|
||||
}
|
||||
|
||||
indexID, err := s.meta.CanCreateIndex(req)
|
||||
if err != nil {
|
||||
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
|
||||
|
|
|
@ -18,6 +18,7 @@ package datacoord
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -26,11 +27,15 @@ import (
|
|||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/datacoord/broker"
|
||||
mockkv "github.com/milvus-io/milvus/internal/kv/mocks"
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
|
||||
catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/mocks"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
|
@ -49,7 +54,6 @@ func TestServer_CreateIndex(t *testing.T) {
|
|||
collID = UniqueID(1)
|
||||
fieldID = UniqueID(10)
|
||||
// indexID = UniqueID(100)
|
||||
indexName = "default_idx"
|
||||
typeParams = []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.DimKey,
|
||||
|
@ -65,7 +69,7 @@ func TestServer_CreateIndex(t *testing.T) {
|
|||
req = &indexpb.CreateIndexRequest{
|
||||
CollectionID: collID,
|
||||
FieldID: fieldID,
|
||||
IndexName: indexName,
|
||||
IndexName: "",
|
||||
TypeParams: typeParams,
|
||||
IndexParams: indexParams,
|
||||
Timestamp: 100,
|
||||
|
@ -83,6 +87,16 @@ func TestServer_CreateIndex(t *testing.T) {
|
|||
s := &Server{
|
||||
meta: &meta{
|
||||
catalog: catalog,
|
||||
collections: map[UniqueID]*collectionInfo{
|
||||
collID: {
|
||||
ID: collID,
|
||||
|
||||
Partitions: nil,
|
||||
StartPositions: nil,
|
||||
Properties: nil,
|
||||
CreatedAt: 0,
|
||||
},
|
||||
},
|
||||
indexes: map[UniqueID]map[UniqueID]*model.Index{},
|
||||
},
|
||||
allocator: newMockAllocator(),
|
||||
|
@ -90,12 +104,81 @@ func TestServer_CreateIndex(t *testing.T) {
|
|||
}
|
||||
|
||||
s.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
|
||||
b := mocks.NewMockRootCoordClient(t)
|
||||
|
||||
t.Run("get field name failed", func(t *testing.T) {
|
||||
b.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error"))
|
||||
s.broker = broker.NewCoordinatorBroker(b)
|
||||
resp, err := s.CreateIndex(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode())
|
||||
assert.Equal(t, "mock error", resp.GetReason())
|
||||
})
|
||||
|
||||
b.ExpectedCalls = nil
|
||||
b.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: 0,
|
||||
Reason: "",
|
||||
Code: 0,
|
||||
Retriable: false,
|
||||
Detail: "",
|
||||
},
|
||||
Schema: &schemapb.CollectionSchema{
|
||||
Name: "test_index",
|
||||
Description: "test index",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 0,
|
||||
Name: "pk",
|
||||
IsPrimaryKey: false,
|
||||
Description: "",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
TypeParams: nil,
|
||||
IndexParams: nil,
|
||||
AutoID: false,
|
||||
State: 0,
|
||||
ElementType: 0,
|
||||
DefaultValue: nil,
|
||||
IsDynamic: false,
|
||||
IsPartitionKey: false,
|
||||
},
|
||||
{
|
||||
FieldID: fieldID,
|
||||
Name: "FieldFloatVector",
|
||||
IsPrimaryKey: false,
|
||||
Description: "",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
TypeParams: nil,
|
||||
IndexParams: nil,
|
||||
AutoID: false,
|
||||
State: 0,
|
||||
ElementType: 0,
|
||||
DefaultValue: nil,
|
||||
IsDynamic: false,
|
||||
IsPartitionKey: false,
|
||||
},
|
||||
},
|
||||
EnableDynamicField: false,
|
||||
},
|
||||
CollectionID: collID,
|
||||
}, nil)
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
resp, err := s.CreateIndex(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
||||
})
|
||||
|
||||
t.Run("success with index exist", func(t *testing.T) {
|
||||
req.IndexName = ""
|
||||
resp, err := s.CreateIndex(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
||||
})
|
||||
|
||||
t.Run("server not healthy", func(t *testing.T) {
|
||||
s.stateCode.Store(commonpb.StateCode_Abnormal)
|
||||
resp, err := s.CreateIndex(ctx, req)
|
||||
|
@ -103,6 +186,7 @@ func TestServer_CreateIndex(t *testing.T) {
|
|||
assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady)
|
||||
})
|
||||
|
||||
req.IndexName = "FieldFloatVector"
|
||||
t.Run("index not consistent", func(t *testing.T) {
|
||||
s.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
req.FieldID++
|
||||
|
|
|
@ -397,9 +397,6 @@ func (cit *createIndexTask) Execute(ctx context.Context) error {
|
|||
zap.Any("newExtraParams", cit.newExtraParams),
|
||||
)
|
||||
|
||||
if cit.req.GetIndexName() == "" {
|
||||
cit.req.IndexName = cit.fieldSchema.GetName()
|
||||
}
|
||||
var err error
|
||||
req := &indexpb.CreateIndexRequest{
|
||||
CollectionID: cit.collectionID,
|
||||
|
@ -852,10 +849,6 @@ func (gibpt *getIndexBuildProgressTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
gibpt.collectionID = collectionID
|
||||
|
||||
if gibpt.IndexName == "" {
|
||||
gibpt.IndexName = Params.CommonCfg.DefaultIndexName.GetValue()
|
||||
}
|
||||
|
||||
resp, err := gibpt.dataCoord.GetIndexBuildProgress(ctx, &indexpb.GetIndexBuildProgressRequest{
|
||||
CollectionID: collectionID,
|
||||
IndexName: gibpt.IndexName,
|
||||
|
|
Loading…
Reference in New Issue