mirror of https://github.com/milvus-io/milvus.git
Combine similar function (#21752)
Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>pull/21776/head
parent
b91bb5a729
commit
ad6cbc990f
|
@ -164,7 +164,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
|
|||
// check primaryFieldData whether autoID is true or not
|
||||
// set rowIDs as primary data if autoID == true
|
||||
// TODO(dragondriver): in fact, NumRows is not trustable, we should check all input fields
|
||||
it.result.IDs, err = checkPrimaryFieldData(it.schema, it.insertMsg)
|
||||
it.result.IDs, err = checkPrimaryFieldData(it.schema, it.result, it.insertMsg, true)
|
||||
log := log.Ctx(ctx).With(zap.String("collectionName", collectionName))
|
||||
if err != nil {
|
||||
log.Error("check primary field data and hash primary key failed",
|
||||
|
|
|
@ -168,7 +168,7 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error {
|
|||
// check primaryFieldData whether autoID is true or not
|
||||
// only allow support autoID == false
|
||||
var err error
|
||||
it.result.IDs, err = upsertCheckPrimaryFieldData(it.schema, it.result, it.upsertMsg.InsertMsg)
|
||||
it.result.IDs, err = checkPrimaryFieldData(it.schema, it.result, it.upsertMsg.InsertMsg, false)
|
||||
log := log.Ctx(ctx).With(zap.String("collectionName", it.upsertMsg.InsertMsg.CollectionName))
|
||||
if err != nil {
|
||||
log.Error("check primary field data and hash primary key failed when upsert",
|
||||
|
|
|
@ -289,396 +289,3 @@ func TestUpsertTask_CheckAligned(t *testing.T) {
|
|||
err = case2.upsertMsg.InsertMsg.CheckAligned()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
// func TestProxyUpsertValid(t *testing.T) {
|
||||
// var err error
|
||||
// var wg sync.WaitGroup
|
||||
// paramtable.Init()
|
||||
|
||||
// path := "/tmp/milvus/rocksmq" + funcutil.GenRandomStr()
|
||||
// t.Setenv("ROCKSMQ_PATH", path)
|
||||
// defer os.RemoveAll(path)
|
||||
|
||||
// ctx, cancel := context.WithCancel(context.Background())
|
||||
// ctx = GetContext(ctx, "root:123456")
|
||||
// localMsg := true
|
||||
// factory := dependency.NewDefaultFactory(localMsg)
|
||||
// alias := "TestProxyUpsertValid"
|
||||
|
||||
// log.Info("Initialize parameter table of Proxy")
|
||||
|
||||
// rc := runRootCoord(ctx, localMsg)
|
||||
// log.Info("running RootCoord ...")
|
||||
|
||||
// if rc != nil {
|
||||
// defer func() {
|
||||
// err := rc.Stop()
|
||||
// assert.NoError(t, err)
|
||||
// log.Info("stop RootCoord")
|
||||
// }()
|
||||
// }
|
||||
|
||||
// dc := runDataCoord(ctx, localMsg)
|
||||
// log.Info("running DataCoord ...")
|
||||
|
||||
// if dc != nil {
|
||||
// defer func() {
|
||||
// err := dc.Stop()
|
||||
// assert.NoError(t, err)
|
||||
// log.Info("stop DataCoord")
|
||||
// }()
|
||||
// }
|
||||
|
||||
// dn := runDataNode(ctx, localMsg, alias)
|
||||
// log.Info("running DataNode ...")
|
||||
|
||||
// if dn != nil {
|
||||
// defer func() {
|
||||
// err := dn.Stop()
|
||||
// assert.NoError(t, err)
|
||||
// log.Info("stop DataNode")
|
||||
// }()
|
||||
// }
|
||||
|
||||
// qc := runQueryCoord(ctx, localMsg)
|
||||
// log.Info("running QueryCoord ...")
|
||||
|
||||
// if qc != nil {
|
||||
// defer func() {
|
||||
// err := qc.Stop()
|
||||
// assert.NoError(t, err)
|
||||
// log.Info("stop QueryCoord")
|
||||
// }()
|
||||
// }
|
||||
|
||||
// qn := runQueryNode(ctx, localMsg, alias)
|
||||
// log.Info("running QueryNode ...")
|
||||
|
||||
// if qn != nil {
|
||||
// defer func() {
|
||||
// err := qn.Stop()
|
||||
// assert.NoError(t, err)
|
||||
// log.Info("stop query node")
|
||||
// }()
|
||||
// }
|
||||
|
||||
// ic := runIndexCoord(ctx, localMsg)
|
||||
// log.Info("running IndexCoord ...")
|
||||
|
||||
// if ic != nil {
|
||||
// defer func() {
|
||||
// err := ic.Stop()
|
||||
// assert.NoError(t, err)
|
||||
// log.Info("stop IndexCoord")
|
||||
// }()
|
||||
// }
|
||||
|
||||
// in := runIndexNode(ctx, localMsg, alias)
|
||||
// log.Info("running IndexNode ...")
|
||||
|
||||
// if in != nil {
|
||||
// defer func() {
|
||||
// err := in.Stop()
|
||||
// assert.NoError(t, err)
|
||||
// log.Info("stop IndexNode")
|
||||
// }()
|
||||
// }
|
||||
|
||||
// time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// proxy, err := NewProxy(ctx, factory)
|
||||
// assert.NoError(t, err)
|
||||
// assert.NotNil(t, proxy)
|
||||
|
||||
// etcdcli, err := etcd.GetEtcdClient(
|
||||
// Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
|
||||
// Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
|
||||
// Params.EtcdCfg.Endpoints.GetAsStrings(),
|
||||
// Params.EtcdCfg.EtcdTLSCert.GetValue(),
|
||||
// Params.EtcdCfg.EtcdTLSKey.GetValue(),
|
||||
// Params.EtcdCfg.EtcdTLSCACert.GetValue(),
|
||||
// Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
|
||||
// defer etcdcli.Close()
|
||||
// assert.NoError(t, err)
|
||||
// proxy.SetEtcdClient(etcdcli)
|
||||
|
||||
// testServer := newProxyTestServer(proxy)
|
||||
// wg.Add(1)
|
||||
|
||||
// base := paramtable.BaseTable{}
|
||||
// base.Init(0)
|
||||
// var p paramtable.GrpcServerConfig
|
||||
// p.Init(typeutil.ProxyRole, &base)
|
||||
|
||||
// go testServer.startGrpc(ctx, &wg, &p)
|
||||
// assert.NoError(t, testServer.waitForGrpcReady())
|
||||
|
||||
// rootCoordClient, err := rcc.NewClient(ctx, Params.EtcdCfg.MetaRootPath.GetValue(), etcdcli)
|
||||
// assert.NoError(t, err)
|
||||
// err = rootCoordClient.Init()
|
||||
// assert.NoError(t, err)
|
||||
// err = funcutil.WaitForComponentHealthy(ctx, rootCoordClient, typeutil.RootCoordRole, attempts, sleepDuration)
|
||||
// assert.NoError(t, err)
|
||||
// proxy.SetRootCoordClient(rootCoordClient)
|
||||
// log.Info("Proxy set root coordinator client")
|
||||
|
||||
// dataCoordClient, err := grpcdatacoordclient2.NewClient(ctx, Params.EtcdCfg.MetaRootPath.GetValue(), etcdcli)
|
||||
// assert.NoError(t, err)
|
||||
// err = dataCoordClient.Init()
|
||||
// assert.NoError(t, err)
|
||||
// err = funcutil.WaitForComponentHealthy(ctx, dataCoordClient, typeutil.DataCoordRole, attempts, sleepDuration)
|
||||
// assert.NoError(t, err)
|
||||
// proxy.SetDataCoordClient(dataCoordClient)
|
||||
// log.Info("Proxy set data coordinator client")
|
||||
|
||||
// queryCoordClient, err := grpcquerycoordclient.NewClient(ctx, Params.EtcdCfg.MetaRootPath.GetValue(), etcdcli)
|
||||
// assert.NoError(t, err)
|
||||
// err = queryCoordClient.Init()
|
||||
// assert.NoError(t, err)
|
||||
// err = funcutil.WaitForComponentHealthy(ctx, queryCoordClient, typeutil.QueryCoordRole, attempts, sleepDuration)
|
||||
// assert.NoError(t, err)
|
||||
// proxy.SetQueryCoordClient(queryCoordClient)
|
||||
// log.Info("Proxy set query coordinator client")
|
||||
|
||||
// indexCoordClient, err := grpcindexcoordclient.NewClient(ctx, Params.EtcdCfg.MetaRootPath.GetValue(), etcdcli)
|
||||
// assert.NoError(t, err)
|
||||
// err = indexCoordClient.Init()
|
||||
// assert.NoError(t, err)
|
||||
// err = funcutil.WaitForComponentHealthy(ctx, indexCoordClient, typeutil.IndexCoordRole, attempts, sleepDuration)
|
||||
// assert.NoError(t, err)
|
||||
// proxy.SetIndexCoordClient(indexCoordClient)
|
||||
// log.Info("Proxy set index coordinator client")
|
||||
|
||||
// proxy.UpdateStateCode(commonpb.StateCode_Initializing)
|
||||
// err = proxy.Init()
|
||||
// assert.NoError(t, err)
|
||||
|
||||
// err = proxy.Start()
|
||||
// assert.NoError(t, err)
|
||||
// assert.Equal(t, commonpb.StateCode_Healthy, proxy.stateCode.Load().(commonpb.StateCode))
|
||||
|
||||
// // register proxy
|
||||
// err = proxy.Register()
|
||||
// assert.NoError(t, err)
|
||||
// log.Info("Register proxy done")
|
||||
// defer func() {
|
||||
// err := proxy.Stop()
|
||||
// assert.NoError(t, err)
|
||||
// }()
|
||||
|
||||
// prefix := "test_proxy_"
|
||||
// partitionPrefix := "test_proxy_partition_"
|
||||
// dbName := ""
|
||||
// collectionName := prefix + funcutil.GenRandomStr()
|
||||
// otherCollectionName := collectionName + "_other_" + funcutil.GenRandomStr()
|
||||
// partitionName := partitionPrefix + funcutil.GenRandomStr()
|
||||
// // otherPartitionName := partitionPrefix + "_other_" + funcutil.GenRandomStr()
|
||||
// shardsNum := int32(2)
|
||||
// int64Field := "int64"
|
||||
// floatVecField := "fVec"
|
||||
// dim := 128
|
||||
// rowNum := 30
|
||||
// // indexName := "_default"
|
||||
// // nlist := 10
|
||||
// // nprobe := 10
|
||||
// // topk := 10
|
||||
// // add a test parameter
|
||||
// // roundDecimal := 6
|
||||
// // nq := 10
|
||||
// // expr := fmt.Sprintf("%s > 0", int64Field)
|
||||
// // var segmentIDs []int64
|
||||
|
||||
// constructCollectionSchema := func() *schemapb.CollectionSchema {
|
||||
// pk := &schemapb.FieldSchema{
|
||||
// FieldID: 0,
|
||||
// Name: int64Field,
|
||||
// IsPrimaryKey: true,
|
||||
// Description: "",
|
||||
// DataType: schemapb.DataType_Int64,
|
||||
// TypeParams: nil,
|
||||
// IndexParams: nil,
|
||||
// AutoID: false,
|
||||
// }
|
||||
// fVec := &schemapb.FieldSchema{
|
||||
// FieldID: 0,
|
||||
// Name: floatVecField,
|
||||
// IsPrimaryKey: false,
|
||||
// Description: "",
|
||||
// DataType: schemapb.DataType_FloatVector,
|
||||
// TypeParams: []*commonpb.KeyValuePair{
|
||||
// {
|
||||
// Key: "dim",
|
||||
// Value: strconv.Itoa(dim),
|
||||
// },
|
||||
// },
|
||||
// IndexParams: nil,
|
||||
// AutoID: false,
|
||||
// }
|
||||
// return &schemapb.CollectionSchema{
|
||||
// Name: collectionName,
|
||||
// Description: "",
|
||||
// AutoID: false,
|
||||
// Fields: []*schemapb.FieldSchema{
|
||||
// pk,
|
||||
// fVec,
|
||||
// },
|
||||
// }
|
||||
// }
|
||||
// schema := constructCollectionSchema()
|
||||
|
||||
// constructCreateCollectionRequest := func() *milvuspb.CreateCollectionRequest {
|
||||
// bs, err := proto.Marshal(schema)
|
||||
// assert.NoError(t, err)
|
||||
// return &milvuspb.CreateCollectionRequest{
|
||||
// Base: nil,
|
||||
// DbName: dbName,
|
||||
// CollectionName: collectionName,
|
||||
// Schema: bs,
|
||||
// ShardsNum: shardsNum,
|
||||
// }
|
||||
// }
|
||||
// createCollectionReq := constructCreateCollectionRequest()
|
||||
|
||||
// constructPartitionReqUpsertRequestValid := func() *milvuspb.UpsertRequest {
|
||||
// pkFieldData := newScalarFieldData(schema.Fields[0], int64Field, rowNum)
|
||||
// fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
|
||||
// hashKeys := generateHashKeys(rowNum)
|
||||
// return &milvuspb.UpsertRequest{
|
||||
// Base: nil,
|
||||
// DbName: dbName,
|
||||
// CollectionName: collectionName,
|
||||
// PartitionName: partitionName,
|
||||
// FieldsData: []*schemapb.FieldData{pkFieldData, fVecColumn},
|
||||
// HashKeys: hashKeys,
|
||||
// NumRows: uint32(rowNum),
|
||||
// }
|
||||
// }
|
||||
|
||||
// constructCollectionUpsertRequestValid := func() *milvuspb.UpsertRequest {
|
||||
// pkFieldData := newScalarFieldData(schema.Fields[0], int64Field, rowNum)
|
||||
// fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
|
||||
// hashKeys := generateHashKeys(rowNum)
|
||||
// return &milvuspb.UpsertRequest{
|
||||
// Base: nil,
|
||||
// DbName: dbName,
|
||||
// CollectionName: collectionName,
|
||||
// PartitionName: partitionName,
|
||||
// FieldsData: []*schemapb.FieldData{pkFieldData, fVecColumn},
|
||||
// HashKeys: hashKeys,
|
||||
// NumRows: uint32(rowNum),
|
||||
// }
|
||||
// }
|
||||
|
||||
// wg.Add(1)
|
||||
// t.Run("create collection upsert valid", func(t *testing.T) {
|
||||
// defer wg.Done()
|
||||
// req := createCollectionReq
|
||||
// resp, err := proxy.CreateCollection(ctx, req)
|
||||
// assert.NoError(t, err)
|
||||
// assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
|
||||
// reqInvalidField := constructCreateCollectionRequest()
|
||||
// schema := constructCollectionSchema()
|
||||
// schema.Fields = append(schema.Fields, &schemapb.FieldSchema{
|
||||
// Name: "StringField",
|
||||
// DataType: schemapb.DataType_String,
|
||||
// })
|
||||
// bs, err := proto.Marshal(schema)
|
||||
// assert.NoError(t, err)
|
||||
// reqInvalidField.CollectionName = "invalid_field_coll_upsert_valid"
|
||||
// reqInvalidField.Schema = bs
|
||||
|
||||
// resp, err = proxy.CreateCollection(ctx, reqInvalidField)
|
||||
// assert.NoError(t, err)
|
||||
// assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
|
||||
// })
|
||||
|
||||
// wg.Add(1)
|
||||
// t.Run("create partition", func(t *testing.T) {
|
||||
// defer wg.Done()
|
||||
// resp, err := proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{
|
||||
// Base: nil,
|
||||
// DbName: dbName,
|
||||
// CollectionName: collectionName,
|
||||
// PartitionName: partitionName,
|
||||
// })
|
||||
// assert.NoError(t, err)
|
||||
// assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
|
||||
// // create partition with non-exist collection -> fail
|
||||
// resp, err = proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{
|
||||
// Base: nil,
|
||||
// DbName: dbName,
|
||||
// CollectionName: otherCollectionName,
|
||||
// PartitionName: partitionName,
|
||||
// })
|
||||
// assert.NoError(t, err)
|
||||
// assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
// })
|
||||
|
||||
// wg.Add(1)
|
||||
// t.Run("upsert partition", func(t *testing.T) {
|
||||
// defer wg.Done()
|
||||
// req := constructPartitionReqUpsertRequestValid()
|
||||
|
||||
// resp, err := proxy.Upsert(ctx, req)
|
||||
// assert.NoError(t, err)
|
||||
// assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
// assert.Equal(t, rowNum, len(resp.SuccIndex))
|
||||
// assert.Equal(t, 0, len(resp.ErrIndex))
|
||||
// assert.Equal(t, int64(rowNum), resp.UpsertCnt)
|
||||
// })
|
||||
|
||||
// wg.Add(1)
|
||||
// t.Run("upsert when autoID == false", func(t *testing.T) {
|
||||
// defer wg.Done()
|
||||
// req := constructCollectionUpsertRequestValid()
|
||||
|
||||
// resp, err := proxy.Upsert(ctx, req)
|
||||
// assert.NoError(t, err)
|
||||
// assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
// assert.Equal(t, rowNum, len(resp.SuccIndex))
|
||||
// assert.Equal(t, 0, len(resp.ErrIndex))
|
||||
// assert.Equal(t, int64(rowNum), resp.UpsertCnt)
|
||||
// })
|
||||
|
||||
// proxy.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
|
||||
// wg.Add(1)
|
||||
// t.Run("Upsert fail, unhealthy", func(t *testing.T) {
|
||||
// defer wg.Done()
|
||||
// resp, err := proxy.Upsert(ctx, &milvuspb.UpsertRequest{})
|
||||
// assert.NoError(t, err)
|
||||
// assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
// })
|
||||
|
||||
// dmParallelism := proxy.sched.dmQueue.getMaxTaskNum()
|
||||
// proxy.sched.dmQueue.setMaxTaskNum(0)
|
||||
|
||||
// wg.Add(1)
|
||||
// t.Run("Upsert fail, dm queue full", func(t *testing.T) {
|
||||
// defer wg.Done()
|
||||
// resp, err := proxy.Upsert(ctx, &milvuspb.UpsertRequest{})
|
||||
// assert.NoError(t, err)
|
||||
// assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
// })
|
||||
// proxy.sched.dmQueue.setMaxTaskNum(dmParallelism)
|
||||
|
||||
// timeout := time.Nanosecond
|
||||
// shortCtx, shortCancel := context.WithTimeout(ctx, timeout)
|
||||
// defer shortCancel()
|
||||
// time.Sleep(timeout)
|
||||
|
||||
// wg.Add(1)
|
||||
// t.Run("Update fail, timeout", func(t *testing.T) {
|
||||
// defer wg.Done()
|
||||
// resp, err := proxy.Upsert(shortCtx, &milvuspb.UpsertRequest{})
|
||||
// assert.NoError(t, err)
|
||||
// assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
// })
|
||||
|
||||
// testServer.gracefulStop()
|
||||
// wg.Wait()
|
||||
// cancel()
|
||||
// }
|
||||
|
|
|
@ -894,7 +894,7 @@ func checkLengthOfFieldsData(schema *schemapb.CollectionSchema, insertMsg *msgst
|
|||
return nil
|
||||
}
|
||||
|
||||
func checkPrimaryFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) (*schemapb.IDs, error) {
|
||||
func checkPrimaryFieldData(schema *schemapb.CollectionSchema, result *milvuspb.MutationResult, insertMsg *msgstream.InsertMsg, inInsert bool) (*schemapb.IDs, error) {
|
||||
rowNums := uint32(insertMsg.NRows())
|
||||
// TODO(dragondriver): in fact, NumRows is not trustable, we should check all input fields
|
||||
if insertMsg.NRows() <= 0 {
|
||||
|
@ -910,72 +910,45 @@ func checkPrimaryFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstre
|
|||
log.Error("get primary field schema failed", zap.String("collectionName", insertMsg.CollectionName), zap.Any("schema", schema), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// get primaryFieldData whether autoID is true or not
|
||||
var primaryFieldData *schemapb.FieldData
|
||||
if !primaryFieldSchema.AutoID {
|
||||
primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema)
|
||||
if err != nil {
|
||||
log.Error("get primary field data failed", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
|
||||
return nil, err
|
||||
if inInsert {
|
||||
// when checkPrimaryFieldData in insert
|
||||
if !primaryFieldSchema.AutoID {
|
||||
primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema)
|
||||
if err != nil {
|
||||
log.Error("get primary field data failed", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
// check primary key data not exist
|
||||
if typeutil.IsPrimaryFieldDataExist(insertMsg.GetFieldsData(), primaryFieldSchema) {
|
||||
return nil, fmt.Errorf("can not assign primary field data when auto id enabled %v", primaryFieldSchema.Name)
|
||||
}
|
||||
// if autoID == true, currently only support autoID for int64 PrimaryField
|
||||
primaryFieldData, err = autoGenPrimaryFieldData(primaryFieldSchema, insertMsg.GetRowIDs())
|
||||
if err != nil {
|
||||
log.Error("generate primary field data failed when autoID == true", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
// if autoID == true, set the primary field data
|
||||
// insertMsg.fieldsData need append primaryFieldData
|
||||
insertMsg.FieldsData = append(insertMsg.FieldsData, primaryFieldData)
|
||||
}
|
||||
} else {
|
||||
// check primary key data not exist
|
||||
if typeutil.IsPrimaryFieldDataExist(insertMsg.GetFieldsData(), primaryFieldSchema) {
|
||||
return nil, fmt.Errorf("can not assign primary field data when auto id enabled %v", primaryFieldSchema.Name)
|
||||
// when checkPrimaryFieldData in upsert
|
||||
if primaryFieldSchema.AutoID {
|
||||
// upsert has not supported when autoID == true
|
||||
log.Info("can not upsert when auto id enabled",
|
||||
zap.String("primaryFieldSchemaName", primaryFieldSchema.Name))
|
||||
result.Status.ErrorCode = commonpb.ErrorCode_UpsertAutoIDTrue
|
||||
return nil, fmt.Errorf("upsert can not assign primary field data when auto id enabled %v", primaryFieldSchema.Name)
|
||||
}
|
||||
// if autoID == true, currently only support autoID for int64 PrimaryField
|
||||
primaryFieldData, err = autoGenPrimaryFieldData(primaryFieldSchema, insertMsg.GetRowIDs())
|
||||
primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema)
|
||||
if err != nil {
|
||||
log.Error("generate primary field data failed when autoID == true", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
|
||||
log.Error("get primary field data failed when upsert", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
// if autoID == true, set the primary field data
|
||||
// insertMsg.fieldsData need append primaryFieldData
|
||||
insertMsg.FieldsData = append(insertMsg.FieldsData, primaryFieldData)
|
||||
}
|
||||
|
||||
// parse primaryFieldData to result.IDs, and as returned primary keys
|
||||
ids, err := parsePrimaryFieldData2IDs(primaryFieldData)
|
||||
if err != nil {
|
||||
log.Error("parse primary field data to IDs failed", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
// TODO(smellthemoon): can merge it with checkPrimaryFieldData
|
||||
func upsertCheckPrimaryFieldData(schema *schemapb.CollectionSchema, result *milvuspb.MutationResult, insertMsg *msgstream.InsertMsg) (*schemapb.IDs, error) {
|
||||
rowNums := uint32(insertMsg.NRows())
|
||||
// TODO(dragondriver): in fact, NumRows is not trustable, we should check all input fields
|
||||
if insertMsg.NRows() <= 0 {
|
||||
return nil, errNumRowsLessThanOrEqualToZero(rowNums)
|
||||
}
|
||||
|
||||
if err := checkLengthOfFieldsData(schema, insertMsg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema)
|
||||
if err != nil {
|
||||
log.Error("get primary field schema failed", zap.String("collectionName", insertMsg.CollectionName), zap.Any("schema", schema), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// get primaryFieldData whether autoID is true or not
|
||||
var primaryFieldData *schemapb.FieldData
|
||||
if primaryFieldSchema.AutoID {
|
||||
// upsert has not supported when autoID == true
|
||||
log.Info("can not upsert when auto id enabled",
|
||||
zap.String("primaryFieldSchemaName", primaryFieldSchema.Name))
|
||||
result.Status.ErrorCode = commonpb.ErrorCode_UpsertAutoIDTrue
|
||||
return nil, fmt.Errorf("upsert can not assign primary field data when auto id enabled %v", primaryFieldSchema.Name)
|
||||
}
|
||||
primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema)
|
||||
if err != nil {
|
||||
log.Error("get primary field data failed when upsert", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// parse primaryFieldData to result.IDs, and as returned primary keys
|
||||
|
|
|
@ -1095,9 +1095,14 @@ func Test_InsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
PartitionName: "TestInsertTask_checkPrimaryFieldData",
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err := checkPrimaryFieldData(case1.schema, case1.insertMsg)
|
||||
_, err := checkPrimaryFieldData(case1.schema, case1.result, case1.insertMsg, true)
|
||||
assert.NotEqual(t, nil, err)
|
||||
|
||||
// the num of passed fields is less than needed
|
||||
|
@ -1134,8 +1139,13 @@ func Test_InsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
Version: internalpb.InsertDataVersion_RowBased,
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
},
|
||||
}
|
||||
_, err = checkPrimaryFieldData(case2.schema, case2.insertMsg)
|
||||
_, err = checkPrimaryFieldData(case2.schema, case2.result, case2.insertMsg, true)
|
||||
assert.NotEqual(t, nil, err)
|
||||
|
||||
// autoID == false, no primary field schema
|
||||
|
@ -1171,8 +1181,13 @@ func Test_InsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
},
|
||||
}
|
||||
_, err = checkPrimaryFieldData(case3.schema, case3.insertMsg)
|
||||
_, err = checkPrimaryFieldData(case3.schema, case3.result, case3.insertMsg, true)
|
||||
assert.NotEqual(t, nil, err)
|
||||
|
||||
// autoID == true, has primary field schema, but primary field data exist
|
||||
|
@ -1212,11 +1227,16 @@ func Test_InsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
},
|
||||
}
|
||||
case4.schema.Fields[0].IsPrimaryKey = true
|
||||
case4.schema.Fields[0].AutoID = true
|
||||
case4.insertMsg.FieldsData[0] = newScalarFieldData(case4.schema.Fields[0], case4.schema.Fields[0].Name, 10)
|
||||
_, err = checkPrimaryFieldData(case4.schema, case4.insertMsg)
|
||||
_, err = checkPrimaryFieldData(case4.schema, case4.result, case4.insertMsg, true)
|
||||
assert.NotEqual(t, nil, err)
|
||||
|
||||
// autoID == true, has primary field schema, but DataType don't match
|
||||
|
@ -1224,7 +1244,7 @@ func Test_InsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
case4.schema.Fields[0].IsPrimaryKey = false
|
||||
case4.schema.Fields[1].IsPrimaryKey = true
|
||||
case4.schema.Fields[1].AutoID = true
|
||||
_, err = checkPrimaryFieldData(case4.schema, case4.insertMsg)
|
||||
_, err = checkPrimaryFieldData(case4.schema, case4.result, case4.insertMsg, true)
|
||||
assert.NotEqual(t, nil, err)
|
||||
}
|
||||
|
||||
|
@ -1254,7 +1274,7 @@ func Test_UpsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
_, err := upsertCheckPrimaryFieldData(case1.schema, case1.result, case1.insertMsg)
|
||||
_, err := checkPrimaryFieldData(case1.schema, case1.result, case1.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
|
||||
// the num of passed fields is less than needed
|
||||
|
@ -1299,7 +1319,7 @@ func Test_UpsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
_, err = upsertCheckPrimaryFieldData(case2.schema, case2.result, case2.insertMsg)
|
||||
_, err = checkPrimaryFieldData(case2.schema, case2.result, case2.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
|
||||
// autoID == false, no primary field schema
|
||||
|
@ -1341,7 +1361,7 @@ func Test_UpsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
_, err = upsertCheckPrimaryFieldData(case3.schema, case3.result, case3.insertMsg)
|
||||
_, err = checkPrimaryFieldData(case3.schema, case3.result, case3.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
|
||||
// autoID == true, upsert don't support it
|
||||
|
@ -1388,7 +1408,7 @@ func Test_UpsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
}
|
||||
case4.schema.Fields[0].IsPrimaryKey = true
|
||||
case4.schema.Fields[0].AutoID = true
|
||||
_, err = upsertCheckPrimaryFieldData(case4.schema, case4.result, case4.insertMsg)
|
||||
_, err = checkPrimaryFieldData(case4.schema, case4.result, case4.insertMsg, false)
|
||||
assert.Equal(t, commonpb.ErrorCode_UpsertAutoIDTrue, case4.result.Status.ErrorCode)
|
||||
assert.NotEqual(t, nil, err)
|
||||
|
||||
|
@ -1434,7 +1454,7 @@ func Test_UpsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
}
|
||||
case5.schema.Fields[0].IsPrimaryKey = true
|
||||
case5.schema.Fields[0].AutoID = false
|
||||
_, err = upsertCheckPrimaryFieldData(case5.schema, case5.result, case5.insertMsg)
|
||||
_, err = checkPrimaryFieldData(case5.schema, case5.result, case5.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
|
||||
// only support DataType Int64 or VarChar as PrimaryField
|
||||
|
@ -1485,6 +1505,6 @@ func Test_UpsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
}
|
||||
case6.schema.Fields[0].IsPrimaryKey = true
|
||||
case6.schema.Fields[0].AutoID = false
|
||||
_, err = upsertCheckPrimaryFieldData(case6.schema, case6.result, case6.insertMsg)
|
||||
_, err = checkPrimaryFieldData(case6.schema, case6.result, case6.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue