Compatibility with handling index params (#19997)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/20058/head
cai.zhang 2022-10-25 16:59:30 +08:00 committed by GitHub
parent ec83bbf789
commit a60db370e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 172 additions and 22 deletions

View File

@ -5,15 +5,16 @@ import (
"sort"
"strings"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/cmd/tools/migration/versions"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/metastore/model"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/cmd/tools/migration/versions"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
func alias210ToAlias220(record *pb.CollectionInfo, ts Timestamp) *model.Alias {
@ -153,16 +154,35 @@ func combineToCollectionIndexesMeta220(fieldIndexes FieldIndexes210, collectionI
if err != nil {
return nil, err
}
newIndexParamsMap := make(map[string]string)
for _, kv := range indexInfo.IndexParams {
if kv.Key == common.IndexParamsKey {
params, err := funcutil.ParseIndexParamsMap(kv.Value)
if err != nil {
return nil, err
}
for k, v := range params {
newIndexParamsMap[k] = v
}
} else {
newIndexParamsMap[kv.Key] = kv.Value
}
}
newIndexParams := make([]*commonpb.KeyValuePair, 0)
for k, v := range newIndexParamsMap {
newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{Key: k, Value: v})
}
record := &model.Index{
TenantID: "", // TODO: how to set this if we support mysql later?
CollectionID: collectionID,
FieldID: index.GetFiledID(),
IndexID: index.GetIndexID(),
IndexName: indexInfo.GetIndexName(),
IsDeleted: indexInfo.GetDeleted(),
CreateTime: indexInfo.GetCreateTime(),
TypeParams: field.GetTypeParams(),
IndexParams: indexInfo.GetIndexParams(),
TenantID: "", // TODO: how to set this if we support mysql later?
CollectionID: collectionID,
FieldID: index.GetFiledID(),
IndexID: index.GetIndexID(),
IndexName: indexInfo.GetIndexName(),
IsDeleted: indexInfo.GetDeleted(),
CreateTime: indexInfo.GetCreateTime(),
TypeParams: field.GetTypeParams(),
IndexParams: newIndexParams,
UserIndexParams: indexInfo.GetIndexParams(),
}
indexes.AddRecord(collectionID, index.GetIndexID(), record)
}

View File

@ -441,8 +441,10 @@ func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRe
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
if !i.metaTable.CanCreateIndex(req) {
ret.Reason = "CreateIndex failed: index already exist, but parameters are inconsistent"
ok, err := i.metaTable.CanCreateIndex(req)
if !ok {
log.Error("CreateIndex failed", zap.Error(err))
ret.Reason = err.Error()
return ret, nil
}
@ -458,7 +460,7 @@ func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRe
req: req,
}
err := i.sched.IndexAddQueue.Enqueue(t)
err = i.sched.IndexAddQueue.Enqueue(t)
if err != nil {
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
ret.Reason = err.Error()

View File

@ -23,6 +23,7 @@ import (
"math/rand"
"path"
"strconv"
"sync"
"testing"
"time"
@ -1075,3 +1076,124 @@ func TestIndexCoord_CheckHealth(t *testing.T) {
assert.NotEmpty(t, resp.Reasons)
})
}
func TestIndexCoord_CreateIndex(t *testing.T) {
ic := &IndexCoord{
metaTable: &metaTable{
catalog: nil,
indexLock: sync.RWMutex{},
segmentIndexLock: sync.RWMutex{},
collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 10,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: "index_type",
Value: "IVF_FLAT",
},
{
Key: "metrics_type",
Value: "HAMMING",
},
{
Key: "nlist",
Value: "128",
},
},
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1025,
IndexID: indexID,
BuildID: buildID,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
},
},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
buildID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1025,
IndexID: indexID,
BuildID: buildID,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
},
},
}
ic.UpdateStateCode(commonpb.StateCode_Healthy)
t.Run("index already exist, but params are inconsistent", func(t *testing.T) {
req := &indexpb.CreateIndexRequest{
CollectionID: collID,
FieldID: fieldID,
IndexName: indexName,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: "index_type",
Value: "IVF_FLAT",
},
{
Key: "metrics_type",
Value: "HAMMING",
},
{
Key: "params",
Value: "{nlist: 128}",
},
},
Timestamp: 0,
IsAutoIndex: false,
UserIndexParams: nil,
}
resp, err := ic.CreateIndex(context.Background(), req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode())
})
}

View File

@ -434,27 +434,33 @@ func (mt *metaTable) GetIndexesForCollection(collID UniqueID, indexName string)
return indexInfos
}
func (mt *metaTable) CanCreateIndex(req *indexpb.CreateIndexRequest) bool {
func (mt *metaTable) CanCreateIndex(req *indexpb.CreateIndexRequest) (bool, error) {
mt.indexLock.RLock()
defer mt.indexLock.RUnlock()
indexes, ok := mt.collectionIndexes[req.CollectionID]
if !ok {
return true
return true, nil
}
for _, index := range indexes {
if index.IsDeleted {
continue
}
if req.IndexName == index.IndexName {
return mt.checkParams(index, req)
if mt.checkParams(index, req) {
return true, nil
}
errMsg := fmt.Sprintf("index already exist, but parameters are inconsistent. source index: %v current index: %v",
fmt.Sprintf("{index_name: %s, field_id: %d, index_params: %v, type_params: %v}", index.IndexName, index.FieldID, index.IndexParams, index.TypeParams),
fmt.Sprintf("{index_name: %s, field_id: %d, index_params: %v, type_params: %v}", req.GetIndexName(), req.GetFieldID(), req.GetIndexParams(), req.GetTypeParams()))
return false, fmt.Errorf("CreateIndex failed: %s", errMsg)
}
if req.FieldID == index.FieldID {
// creating multiple indexes on same field is not supported
return false
return false, fmt.Errorf("CreateIndex failed: creating multiple indexes on same field is not supported")
}
}
return true
return true, nil
}
func (mt *metaTable) checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool {