mirror of https://github.com/milvus-io/milvus.git
enhance: support upsert autoid==true (#30342)
related with: #29258 --------- Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>pull/34597/head^2
parent
e4115485b6
commit
07b94b4615
|
@ -627,7 +627,7 @@ func TestProxy(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
constructCollectionUpsertRequest := func() *milvuspb.UpsertRequest {
|
||||
constructCollectionUpsertRequestNoPK := func() *milvuspb.UpsertRequest {
|
||||
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
|
||||
bVecColumn := newBinaryVectorFieldData(binaryVecField, rowNum, dim)
|
||||
hashKeys := testutils.GenerateHashKeys(rowNum)
|
||||
|
@ -642,6 +642,22 @@ func TestProxy(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
constructCollectionUpsertRequestWithPK := func() *milvuspb.UpsertRequest {
|
||||
pkFieldData := newScalarFieldData(schema.Fields[0], int64Field, rowNum)
|
||||
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
|
||||
bVecColumn := newBinaryVectorFieldData(binaryVecField, rowNum, dim)
|
||||
hashKeys := testutils.GenerateHashKeys(rowNum)
|
||||
return &milvuspb.UpsertRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionName: partitionName,
|
||||
FieldsData: []*schemapb.FieldData{pkFieldData, fVecColumn, bVecColumn},
|
||||
HashKeys: hashKeys,
|
||||
NumRows: uint32(rowNum),
|
||||
}
|
||||
}
|
||||
|
||||
constructCreateIndexRequest := func(dataType schemapb.DataType) *milvuspb.CreateIndexRequest {
|
||||
req := &milvuspb.CreateIndexRequest{
|
||||
Base: nil,
|
||||
|
@ -2236,6 +2252,30 @@ func TestProxy(t *testing.T) {
|
|||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||
})
|
||||
|
||||
wg.Add(1)
|
||||
t.Run("upsert when autoID == true", func(t *testing.T) {
|
||||
defer wg.Done()
|
||||
// autoID==true but not pass pk in upsert, failed
|
||||
req := constructCollectionUpsertRequestNoPK()
|
||||
|
||||
resp, err := proxy.Upsert(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrParameterInvalid)
|
||||
assert.Equal(t, 0, len(resp.SuccIndex))
|
||||
assert.Equal(t, rowNum, len(resp.ErrIndex))
|
||||
assert.Equal(t, int64(0), resp.UpsertCnt)
|
||||
|
||||
// autoID==true and pass pk in upsert, succeed
|
||||
req = constructCollectionUpsertRequestWithPK()
|
||||
|
||||
resp, err = proxy.Upsert(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||
assert.Equal(t, rowNum, len(resp.SuccIndex))
|
||||
assert.Equal(t, 0, len(resp.ErrIndex))
|
||||
assert.Equal(t, int64(rowNum), resp.UpsertCnt)
|
||||
})
|
||||
|
||||
wg.Add(1)
|
||||
t.Run("release partition", func(t *testing.T) {
|
||||
defer wg.Done()
|
||||
|
@ -2391,19 +2431,6 @@ func TestProxy(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
wg.Add(1)
|
||||
t.Run("upsert when autoID == true", func(t *testing.T) {
|
||||
defer wg.Done()
|
||||
req := constructCollectionUpsertRequest()
|
||||
|
||||
resp, err := proxy.Upsert(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrParameterInvalid)
|
||||
assert.Equal(t, 0, len(resp.SuccIndex))
|
||||
assert.Equal(t, rowNum, len(resp.ErrIndex))
|
||||
assert.Equal(t, int64(0), resp.UpsertCnt)
|
||||
})
|
||||
|
||||
wg.Add(1)
|
||||
t.Run("drop collection", func(t *testing.T) {
|
||||
defer wg.Done()
|
||||
|
|
|
@ -163,7 +163,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
|
|||
// check primaryFieldData whether autoID is true or not
|
||||
// set rowIDs as primary data if autoID == true
|
||||
// TODO(dragondriver): in fact, NumRows is not trustable, we should check all input fields
|
||||
it.result.IDs, err = checkPrimaryFieldData(it.schema, it.result, it.insertMsg, true)
|
||||
it.result.IDs, err = checkPrimaryFieldData(it.schema, it.insertMsg, true)
|
||||
log := log.Ctx(ctx).With(zap.String("collectionName", collectionName))
|
||||
if err != nil {
|
||||
log.Warn("check primary field data and hash primary key failed",
|
||||
|
|
|
@ -179,10 +179,10 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
// check primaryFieldData whether autoID is true or not
|
||||
// only allow support autoID == false
|
||||
// use the passed pk as new pk when autoID == false
|
||||
// automatic generate pk as new pk wehen autoID == true
|
||||
var err error
|
||||
it.result.IDs, err = checkPrimaryFieldData(it.schema.CollectionSchema, it.result, it.upsertMsg.InsertMsg, false)
|
||||
it.result.IDs, err = checkPrimaryFieldData(it.schema.CollectionSchema, it.upsertMsg.InsertMsg, false)
|
||||
log := log.Ctx(ctx).With(zap.String("collectionName", it.upsertMsg.InsertMsg.CollectionName))
|
||||
if err != nil {
|
||||
log.Warn("check primary field data and hash primary key failed when upsert",
|
||||
|
|
|
@ -1123,7 +1123,7 @@ func isPartitionLoaded(ctx context.Context, qc types.QueryCoordClient, collID in
|
|||
return false, nil
|
||||
}
|
||||
|
||||
func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error {
|
||||
func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg, inInsert bool) error {
|
||||
log := log.With(zap.String("collection", schema.GetName()))
|
||||
primaryKeyNum := 0
|
||||
autoGenFieldNum := 0
|
||||
|
@ -1142,16 +1142,20 @@ func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgst
|
|||
log.Warn("not primary key field, but set autoID true", zap.String("field", fieldSchema.GetName()))
|
||||
return merr.WrapErrParameterInvalidMsg("only primary key could be with AutoID enabled")
|
||||
}
|
||||
|
||||
if fieldSchema.IsPrimaryKey {
|
||||
primaryKeyNum++
|
||||
}
|
||||
if fieldSchema.GetDefaultValue() != nil && fieldSchema.IsPrimaryKey {
|
||||
return merr.WrapErrParameterInvalidMsg("primary key can't be with default value")
|
||||
}
|
||||
if fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() && inInsert {
|
||||
// when inInsert, no need to pass when pk is autoid and SkipAutoIDCheck is false
|
||||
autoGenFieldNum++
|
||||
}
|
||||
if _, ok := dataNameSet[fieldSchema.GetName()]; !ok {
|
||||
if fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() {
|
||||
// no need to pass when pk is autoid and SkipAutoIDCheck is false
|
||||
autoGenFieldNum++
|
||||
if fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() && inInsert {
|
||||
// autoGenField
|
||||
continue
|
||||
}
|
||||
if fieldSchema.GetDefaultValue() == nil && !fieldSchema.GetNullable() {
|
||||
|
@ -1175,7 +1179,8 @@ func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgst
|
|||
}
|
||||
|
||||
expectedNum := len(schema.Fields)
|
||||
actualNum := autoGenFieldNum + len(insertMsg.FieldsData)
|
||||
actualNum := len(insertMsg.FieldsData) + autoGenFieldNum
|
||||
|
||||
if expectedNum != actualNum {
|
||||
log.Warn("the number of fields is not the same as needed", zap.Int("expected", expectedNum), zap.Int("actual", actualNum))
|
||||
return merr.WrapErrParameterInvalid(expectedNum, actualNum, "more fieldData has pass in")
|
||||
|
@ -1184,20 +1189,21 @@ func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgst
|
|||
return nil
|
||||
}
|
||||
|
||||
func checkPrimaryFieldData(schema *schemapb.CollectionSchema, result *milvuspb.MutationResult, insertMsg *msgstream.InsertMsg, inInsert bool) (*schemapb.IDs, error) {
|
||||
func checkPrimaryFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg, inInsert bool) (*schemapb.IDs, error) {
|
||||
log := log.With(zap.String("collectionName", insertMsg.CollectionName))
|
||||
rowNums := uint32(insertMsg.NRows())
|
||||
// TODO(dragondriver): in fact, NumRows is not trustable, we should check all input fields
|
||||
if insertMsg.NRows() <= 0 {
|
||||
return nil, merr.WrapErrParameterInvalid("invalid num_rows", fmt.Sprint(rowNums), "num_rows should be greater than 0")
|
||||
}
|
||||
|
||||
if err := checkFieldsDataBySchema(schema, insertMsg); err != nil {
|
||||
if err := checkFieldsDataBySchema(schema, insertMsg, inInsert); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema)
|
||||
if err != nil {
|
||||
log.Error("get primary field schema failed", zap.String("collectionName", insertMsg.CollectionName), zap.Any("schema", schema), zap.Error(err))
|
||||
log.Error("get primary field schema failed", zap.Any("schema", schema), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
if primaryFieldSchema.GetNullable() {
|
||||
|
@ -1215,18 +1221,18 @@ func checkPrimaryFieldData(schema *schemapb.CollectionSchema, result *milvuspb.M
|
|||
if !primaryFieldSchema.AutoID || skipAutoIDCheck {
|
||||
primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema)
|
||||
if err != nil {
|
||||
log.Info("get primary field data failed", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
|
||||
log.Info("get primary field data failed", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
// check primary key data not exist
|
||||
if typeutil.IsPrimaryFieldDataExist(insertMsg.GetFieldsData(), primaryFieldSchema) {
|
||||
return nil, fmt.Errorf("can not assign primary field data when auto id enabled %v", primaryFieldSchema.Name)
|
||||
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("can not assign primary field data when auto id enabled %v", primaryFieldSchema.Name))
|
||||
}
|
||||
// if autoID == true, currently support autoID for int64 and varchar PrimaryField
|
||||
primaryFieldData, err = autoGenPrimaryFieldData(primaryFieldSchema, insertMsg.GetRowIDs())
|
||||
if err != nil {
|
||||
log.Info("generate primary field data failed when autoID == true", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
|
||||
log.Info("generate primary field data failed when autoID == true", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
// if autoID == true, set the primary field data
|
||||
|
@ -1234,26 +1240,35 @@ func checkPrimaryFieldData(schema *schemapb.CollectionSchema, result *milvuspb.M
|
|||
insertMsg.FieldsData = append(insertMsg.FieldsData, primaryFieldData)
|
||||
}
|
||||
} else {
|
||||
// when checkPrimaryFieldData in upsert
|
||||
if primaryFieldSchema.AutoID {
|
||||
// upsert has not supported when autoID == true
|
||||
log.Info("can not upsert when auto id enabled",
|
||||
zap.String("primaryFieldSchemaName", primaryFieldSchema.Name))
|
||||
err := merr.WrapErrParameterInvalidMsg(fmt.Sprintf("upsert can not assign primary field data when auto id enabled %v", primaryFieldSchema.GetName()))
|
||||
result.Status = merr.Status(err)
|
||||
return nil, err
|
||||
primaryFieldID := primaryFieldSchema.FieldID
|
||||
primaryFieldName := primaryFieldSchema.Name
|
||||
for i, field := range insertMsg.GetFieldsData() {
|
||||
if field.FieldId == primaryFieldID || field.FieldName == primaryFieldName {
|
||||
primaryFieldData = field
|
||||
if primaryFieldSchema.AutoID {
|
||||
// use the passed pk as new pk when autoID == false
|
||||
// automatic generate pk as new pk wehen autoID == true
|
||||
newPrimaryFieldData, err := autoGenPrimaryFieldData(primaryFieldSchema, insertMsg.GetRowIDs())
|
||||
if err != nil {
|
||||
log.Info("generate new primary field data failed when upsert", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
insertMsg.FieldsData = append(insertMsg.GetFieldsData()[:i], insertMsg.GetFieldsData()[i+1:]...)
|
||||
insertMsg.FieldsData = append(insertMsg.FieldsData, newPrimaryFieldData)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema)
|
||||
if err != nil {
|
||||
log.Error("get primary field data failed when upsert", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
|
||||
return nil, err
|
||||
// must assign primary field data when upsert
|
||||
if primaryFieldData == nil {
|
||||
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("must assign pk when upsert, primary field: %v", primaryFieldName))
|
||||
}
|
||||
}
|
||||
|
||||
// parse primaryFieldData to result.IDs, and as returned primary keys
|
||||
ids, err := parsePrimaryFieldData2IDs(primaryFieldData)
|
||||
if err != nil {
|
||||
log.Warn("parse primary field data to IDs failed", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
|
||||
log.Warn("parse primary field data to IDs failed", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
@ -1122,7 +1122,7 @@ func Test_isPartitionIsLoaded(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
||||
func Test_InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
||||
paramtable.Init()
|
||||
log.Info("InsertTaskcheckFieldsDataBySchema", zap.Bool("enable", Params.ProxyCfg.SkipAutoIDCheck.GetAsBool()))
|
||||
var err error
|
||||
|
@ -1148,7 +1148,7 @@ func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg, true)
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, len(task.insertMsg.FieldsData), 0)
|
||||
})
|
||||
|
@ -1178,7 +1178,7 @@ func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg, true)
|
||||
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
|
||||
})
|
||||
|
||||
|
@ -1219,7 +1219,7 @@ func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg, true)
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, len(task.insertMsg.FieldsData), 2)
|
||||
})
|
||||
|
@ -1249,7 +1249,7 @@ func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg, true)
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, len(task.insertMsg.FieldsData), 0)
|
||||
})
|
||||
|
@ -1278,7 +1278,7 @@ func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg, true)
|
||||
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
|
||||
})
|
||||
|
||||
|
@ -1312,7 +1312,7 @@ func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg, true)
|
||||
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
|
||||
})
|
||||
|
||||
|
@ -1350,7 +1350,7 @@ func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg, true)
|
||||
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
|
||||
})
|
||||
|
||||
|
@ -1384,7 +1384,7 @@ func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg, true)
|
||||
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
|
||||
})
|
||||
|
||||
|
@ -1397,13 +1397,13 @@ func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
|||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Name: "a",
|
||||
AutoID: false,
|
||||
AutoID: true,
|
||||
IsPrimaryKey: true,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
Name: "b",
|
||||
AutoID: false,
|
||||
AutoID: true,
|
||||
IsPrimaryKey: true,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
|
@ -1418,7 +1418,7 @@ func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg, true)
|
||||
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
|
||||
})
|
||||
|
||||
|
@ -1451,9 +1451,93 @@ func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg, false)
|
||||
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
|
||||
})
|
||||
t.Run("normal when upsert", func(t *testing.T) {
|
||||
task := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "Test_CheckFieldsDataBySchema",
|
||||
Description: "Test_CheckFieldsDataBySchema",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Name: "a",
|
||||
AutoID: false,
|
||||
IsPrimaryKey: true,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
Name: "b",
|
||||
AutoID: false,
|
||||
IsPrimaryKey: false,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
},
|
||||
},
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "a",
|
||||
Type: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldName: "b",
|
||||
Type: schemapb.DataType_Int64,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg, false)
|
||||
assert.NoError(t, err)
|
||||
|
||||
task = insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "Test_CheckFieldsDataBySchema",
|
||||
Description: "Test_CheckFieldsDataBySchema",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Name: "a",
|
||||
AutoID: true,
|
||||
IsPrimaryKey: true,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
Name: "b",
|
||||
AutoID: false,
|
||||
IsPrimaryKey: false,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
},
|
||||
},
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "a",
|
||||
Type: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldName: "b",
|
||||
Type: schemapb.DataType_Int64,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg, false)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("skip the auto id", func(t *testing.T) {
|
||||
task := insertTask{
|
||||
|
@ -1494,7 +1578,7 @@ func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg, true)
|
||||
assert.ErrorIs(t, merr.ErrParameterInvalid, err)
|
||||
assert.Equal(t, len(task.insertMsg.FieldsData), 2)
|
||||
|
||||
|
@ -1537,7 +1621,7 @@ func InsertTaskcheckFieldsDataBySchema(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg)
|
||||
err = checkFieldsDataBySchema(task.schema, task.insertMsg, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(task.insertMsg.FieldsData), 2)
|
||||
paramtable.Get().Reset(Params.ProxyCfg.SkipAutoIDCheck.Key)
|
||||
|
@ -1569,7 +1653,7 @@ func Test_InsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
_, err := checkPrimaryFieldData(case1.schema, case1.result, case1.insertMsg, true)
|
||||
_, err := checkPrimaryFieldData(case1.schema, case1.insertMsg, true)
|
||||
assert.NotEqual(t, nil, err)
|
||||
|
||||
// the num of passed fields is less than needed
|
||||
|
@ -1610,7 +1694,7 @@ func Test_InsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
_, err = checkPrimaryFieldData(case2.schema, case2.result, case2.insertMsg, true)
|
||||
_, err = checkPrimaryFieldData(case2.schema, case2.insertMsg, true)
|
||||
assert.NotEqual(t, nil, err)
|
||||
|
||||
// autoID == false, no primary field schema
|
||||
|
@ -1650,7 +1734,7 @@ func Test_InsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
_, err = checkPrimaryFieldData(case3.schema, case3.result, case3.insertMsg, true)
|
||||
_, err = checkPrimaryFieldData(case3.schema, case3.insertMsg, true)
|
||||
assert.NotEqual(t, nil, err)
|
||||
|
||||
// autoID == true, has primary field schema, but primary field data exist
|
||||
|
@ -1697,265 +1781,359 @@ func Test_InsertTaskCheckPrimaryFieldData(t *testing.T) {
|
|||
case4.schema.Fields[0].IsPrimaryKey = true
|
||||
case4.schema.Fields[0].AutoID = true
|
||||
case4.insertMsg.FieldsData[0] = newScalarFieldData(case4.schema.Fields[0], case4.schema.Fields[0].Name, 10)
|
||||
_, err = checkPrimaryFieldData(case4.schema, case4.result, case4.insertMsg, true)
|
||||
_, err = checkPrimaryFieldData(case4.schema, case4.insertMsg, true)
|
||||
assert.NotEqual(t, nil, err)
|
||||
|
||||
// autoID == true, has primary field schema, but DataType don't match
|
||||
// the data type of the data and the schema do not match
|
||||
// the data type of the data not matches the schema
|
||||
case4.schema.Fields[0].IsPrimaryKey = false
|
||||
case4.schema.Fields[1].IsPrimaryKey = true
|
||||
case4.schema.Fields[1].AutoID = true
|
||||
_, err = checkPrimaryFieldData(case4.schema, case4.result, case4.insertMsg, true)
|
||||
_, err = checkPrimaryFieldData(case4.schema, case4.insertMsg, true)
|
||||
assert.NotEqual(t, nil, err)
|
||||
}
|
||||
|
||||
func Test_UpsertTaskCheckPrimaryFieldData(t *testing.T) {
|
||||
// schema is empty, though won't happen in system
|
||||
// num_rows(0) should be greater than 0
|
||||
case1 := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestUpsertTask_checkPrimaryFieldData",
|
||||
Description: "TestUpsertTask_checkPrimaryFieldData",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{},
|
||||
},
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
DbName: "TestUpsertTask_checkPrimaryFieldData",
|
||||
CollectionName: "TestUpsertTask_checkPrimaryFieldData",
|
||||
PartitionName: "TestUpsertTask_checkPrimaryFieldData",
|
||||
t.Run("schema is empty, though won't happen in system", func(t *testing.T) {
|
||||
task := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestUpsertTask_checkPrimaryFieldData",
|
||||
Description: "TestUpsertTask_checkPrimaryFieldData",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{},
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
_, err := checkPrimaryFieldData(case1.schema, case1.result, case1.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
DbName: "TestUpsertTask_checkPrimaryFieldData",
|
||||
CollectionName: "TestUpsertTask_checkPrimaryFieldData",
|
||||
PartitionName: "TestUpsertTask_checkPrimaryFieldData",
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
_, err := checkPrimaryFieldData(task.schema, task.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
})
|
||||
|
||||
// the num of passed fields is less than needed
|
||||
case2 := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestUpsertTask_checkPrimaryFieldData",
|
||||
Description: "TestUpsertTask_checkPrimaryFieldData",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Name: "int64Field",
|
||||
FieldID: 1,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
Name: "floatField",
|
||||
FieldID: 2,
|
||||
DataType: schemapb.DataType_Float,
|
||||
},
|
||||
},
|
||||
},
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
RowData: []*commonpb.Blob{
|
||||
{},
|
||||
{},
|
||||
},
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
t.Run("the num of passed fields is less than needed", func(t *testing.T) {
|
||||
task := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestUpsertTask_checkPrimaryFieldData",
|
||||
Description: "TestUpsertTask_checkPrimaryFieldData",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Type: schemapb.DataType_Int64,
|
||||
FieldName: "int64Field",
|
||||
Name: "int64Field",
|
||||
FieldID: 1,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
Name: "floatField",
|
||||
FieldID: 2,
|
||||
DataType: schemapb.DataType_Float,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
_, err = checkPrimaryFieldData(case2.schema, case2.result, case2.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
RowData: []*commonpb.Blob{
|
||||
{},
|
||||
{},
|
||||
},
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{
|
||||
Type: schemapb.DataType_Int64,
|
||||
FieldName: "int64Field",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
_, err := checkPrimaryFieldData(task.schema, task.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
})
|
||||
|
||||
// autoID == false, no primary field schema
|
||||
// primary field is not found
|
||||
case3 := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestUpsertTask_checkPrimaryFieldData",
|
||||
Description: "TestUpsertTask_checkPrimaryFieldData",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Name: "int64Field",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
Name: "floatField",
|
||||
DataType: schemapb.DataType_Float,
|
||||
},
|
||||
},
|
||||
},
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
RowData: []*commonpb.Blob{
|
||||
{},
|
||||
{},
|
||||
},
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{},
|
||||
{},
|
||||
},
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
_, err = checkPrimaryFieldData(case3.schema, case3.result, case3.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
|
||||
// autoID == true, upsert don't support it
|
||||
case4 := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestUpsertTask_checkPrimaryFieldData",
|
||||
Description: "TestUpsertTask_checkPrimaryFieldData",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Name: "int64Field",
|
||||
FieldID: 1,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
Name: "floatField",
|
||||
FieldID: 2,
|
||||
DataType: schemapb.DataType_Float,
|
||||
},
|
||||
},
|
||||
},
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
RowData: []*commonpb.Blob{
|
||||
{},
|
||||
{},
|
||||
},
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
t.Run("primary field is not found", func(t *testing.T) {
|
||||
task := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestUpsertTask_checkPrimaryFieldData",
|
||||
Description: "TestUpsertTask_checkPrimaryFieldData",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Type: schemapb.DataType_Float,
|
||||
FieldName: "floatField",
|
||||
Name: "int64Field",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
Name: "floatField",
|
||||
DataType: schemapb.DataType_Float,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
case4.schema.Fields[0].IsPrimaryKey = true
|
||||
case4.schema.Fields[0].AutoID = true
|
||||
_, err = checkPrimaryFieldData(case4.schema, case4.result, case4.insertMsg, false)
|
||||
assert.ErrorIs(t, merr.Error(case4.result.GetStatus()), merr.ErrParameterInvalid)
|
||||
assert.NotEqual(t, nil, err)
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
RowData: []*commonpb.Blob{
|
||||
{},
|
||||
{},
|
||||
},
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{},
|
||||
{},
|
||||
},
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
_, err := checkPrimaryFieldData(task.schema, task.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
})
|
||||
|
||||
// primary field data is nil, GetPrimaryFieldData fail
|
||||
case5 := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestUpsertTask_checkPrimaryFieldData",
|
||||
Description: "TestUpsertTask_checkPrimaryFieldData",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Name: "int64Field",
|
||||
FieldID: 1,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
Name: "floatField",
|
||||
FieldID: 2,
|
||||
DataType: schemapb.DataType_Float,
|
||||
t.Run("primary field data is nil", func(t *testing.T) {
|
||||
task := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestUpsertTask_checkPrimaryFieldData",
|
||||
Description: "TestUpsertTask_checkPrimaryFieldData",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Name: "int64Field",
|
||||
FieldID: 1,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
AutoID: false,
|
||||
},
|
||||
{
|
||||
Name: "floatField",
|
||||
FieldID: 2,
|
||||
DataType: schemapb.DataType_Float,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
RowData: []*commonpb.Blob{
|
||||
{},
|
||||
{},
|
||||
},
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{},
|
||||
{},
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
RowData: []*commonpb.Blob{
|
||||
{},
|
||||
{},
|
||||
},
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{},
|
||||
{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
case5.schema.Fields[0].IsPrimaryKey = true
|
||||
case5.schema.Fields[0].AutoID = false
|
||||
_, err = checkPrimaryFieldData(case5.schema, case5.result, case5.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
_, err := checkPrimaryFieldData(task.schema, task.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
})
|
||||
|
||||
// only support DataType Int64 or VarChar as PrimaryField
|
||||
case6 := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestUpsertTask_checkPrimaryFieldData",
|
||||
Description: "TestUpsertTask_checkPrimaryFieldData",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Name: "floatVectorField",
|
||||
FieldID: 1,
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
},
|
||||
{
|
||||
Name: "floatField",
|
||||
FieldID: 2,
|
||||
DataType: schemapb.DataType_Float,
|
||||
},
|
||||
},
|
||||
},
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
RowData: []*commonpb.Blob{
|
||||
{},
|
||||
{},
|
||||
},
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
t.Run("primary field type wrong", func(t *testing.T) {
|
||||
task := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestUpsertTask_checkPrimaryFieldData",
|
||||
Description: "TestUpsertTask_checkPrimaryFieldData",
|
||||
AutoID: true,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Type: schemapb.DataType_FloatVector,
|
||||
FieldName: "floatVectorField",
|
||||
Name: "floatVectorField",
|
||||
FieldID: 1,
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
AutoID: true,
|
||||
IsPrimaryKey: true,
|
||||
},
|
||||
{
|
||||
Type: schemapb.DataType_Int64,
|
||||
FieldName: "floatField",
|
||||
Name: "floatField",
|
||||
FieldID: 2,
|
||||
DataType: schemapb.DataType_Float,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
case6.schema.Fields[0].IsPrimaryKey = true
|
||||
case6.schema.Fields[0].AutoID = false
|
||||
_, err = checkPrimaryFieldData(case6.schema, case6.result, case6.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
RowData: []*commonpb.Blob{
|
||||
{},
|
||||
{},
|
||||
},
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{
|
||||
Type: schemapb.DataType_FloatVector,
|
||||
FieldName: "floatVectorField",
|
||||
},
|
||||
{
|
||||
Type: schemapb.DataType_Int64,
|
||||
FieldName: "floatField",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
_, err := checkPrimaryFieldData(task.schema, task.insertMsg, false)
|
||||
assert.NotEqual(t, nil, err)
|
||||
})
|
||||
|
||||
t.Run("upsert must assign pk", func(t *testing.T) {
|
||||
// autoid==true
|
||||
task := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestUpsertTask_checkPrimaryFieldData",
|
||||
Description: "TestUpsertTask_checkPrimaryFieldData",
|
||||
AutoID: true,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Name: "int64Field",
|
||||
FieldID: 1,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
AutoID: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
RowData: []*commonpb.Blob{
|
||||
{},
|
||||
},
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "int64Field",
|
||||
Type: schemapb.DataType_Int64,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
_, err := checkPrimaryFieldData(task.schema, task.insertMsg, false)
|
||||
assert.NoError(t, nil, err)
|
||||
|
||||
// autoid==false
|
||||
task = insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestUpsertTask_checkPrimaryFieldData",
|
||||
Description: "TestUpsertTask_checkPrimaryFieldData",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Name: "int64Field",
|
||||
FieldID: 1,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
AutoID: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
RowData: []*commonpb.Blob{
|
||||
{},
|
||||
},
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "int64Field",
|
||||
Type: schemapb.DataType_Int64,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
_, err = checkPrimaryFieldData(task.schema, task.insertMsg, false)
|
||||
assert.NoError(t, nil, err)
|
||||
})
|
||||
|
||||
t.Run("will generate new pk when autoid == true", func(t *testing.T) {
|
||||
// autoid==true
|
||||
task := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestUpsertTask_checkPrimaryFieldData",
|
||||
Description: "TestUpsertTask_checkPrimaryFieldData",
|
||||
AutoID: true,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
Name: "int64Field",
|
||||
FieldID: 1,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPrimaryKey: true,
|
||||
AutoID: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
insertMsg: &BaseInsertTask{
|
||||
InsertRequest: msgpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
},
|
||||
RowData: []*commonpb.Blob{
|
||||
{},
|
||||
},
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{
|
||||
FieldName: "int64Field",
|
||||
Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{
|
||||
Data: []int64{2},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
RowIDs: []int64{1},
|
||||
},
|
||||
},
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
},
|
||||
}
|
||||
_, err := checkPrimaryFieldData(task.schema, task.insertMsg, false)
|
||||
newPK := task.insertMsg.FieldsData[0].GetScalars().GetLongData().GetData()
|
||||
assert.Equal(t, newPK, task.insertMsg.RowIDs)
|
||||
assert.NoError(t, nil, err)
|
||||
})
|
||||
}
|
||||
|
||||
func Test_ParseGuaranteeTs(t *testing.T) {
|
||||
|
|
|
@ -39,7 +39,7 @@ type UpsertSuite struct {
|
|||
integration.MiniClusterSuite
|
||||
}
|
||||
|
||||
func (s *UpsertSuite) TestUpsert() {
|
||||
func (s *UpsertSuite) TestUpsertAutoIDFalse() {
|
||||
c := s.Cluster
|
||||
ctx, cancel := context.WithCancel(c.GetContext())
|
||||
defer cancel()
|
||||
|
@ -151,11 +151,130 @@ func (s *UpsertSuite) TestUpsert() {
|
|||
}
|
||||
s.NoError(err)
|
||||
|
||||
log.Info("==================")
|
||||
log.Info("==================")
|
||||
log.Info("TestUpsert succeed")
|
||||
log.Info("==================")
|
||||
log.Info("==================")
|
||||
log.Info("===========================")
|
||||
log.Info("===========================")
|
||||
log.Info("TestUpsertAutoIDFalse succeed")
|
||||
log.Info("===========================")
|
||||
log.Info("===========================")
|
||||
}
|
||||
|
||||
func (s *UpsertSuite) TestUpsertAutoIDTrue() {
|
||||
c := s.Cluster
|
||||
ctx, cancel := context.WithCancel(c.GetContext())
|
||||
defer cancel()
|
||||
|
||||
prefix := "TestUpsert"
|
||||
dbName := ""
|
||||
collectionName := prefix + funcutil.GenRandomStr()
|
||||
dim := 128
|
||||
rowNum := 3000
|
||||
|
||||
schema := integration.ConstructSchema(collectionName, dim, true)
|
||||
marshaledSchema, err := proto.Marshal(schema)
|
||||
s.NoError(err)
|
||||
|
||||
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
Schema: marshaledSchema,
|
||||
ShardsNum: common.DefaultShardsNum,
|
||||
})
|
||||
s.NoError(err)
|
||||
|
||||
err = merr.Error(createCollectionStatus)
|
||||
if err != nil {
|
||||
log.Warn("createCollectionStatus fail reason", zap.Error(err))
|
||||
}
|
||||
|
||||
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
|
||||
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
|
||||
s.NoError(err)
|
||||
s.True(merr.Ok(showCollectionsResp.GetStatus()))
|
||||
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
|
||||
|
||||
pkFieldData := integration.NewInt64FieldData(integration.Int64Field, rowNum)
|
||||
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
|
||||
hashKeys := integration.GenerateHashKeys(rowNum)
|
||||
upsertResult, err := c.Proxy.Upsert(ctx, &milvuspb.UpsertRequest{
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
FieldsData: []*schemapb.FieldData{pkFieldData, fVecColumn},
|
||||
HashKeys: hashKeys,
|
||||
NumRows: uint32(rowNum),
|
||||
})
|
||||
s.NoError(err)
|
||||
s.True(merr.Ok(upsertResult.GetStatus()))
|
||||
|
||||
// flush
|
||||
flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
|
||||
DbName: dbName,
|
||||
CollectionNames: []string{collectionName},
|
||||
})
|
||||
s.NoError(err)
|
||||
segmentIDs, has := flushResp.GetCollSegIDs()[collectionName]
|
||||
ids := segmentIDs.GetData()
|
||||
s.Require().NotEmpty(segmentIDs)
|
||||
s.Require().True(has)
|
||||
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
|
||||
s.True(has)
|
||||
|
||||
segments, err := c.MetaWatcher.ShowSegments()
|
||||
s.NoError(err)
|
||||
s.NotEmpty(segments)
|
||||
for _, segment := range segments {
|
||||
log.Info("ShowSegments result", zap.String("segment", segment.String()))
|
||||
}
|
||||
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
|
||||
|
||||
// create index
|
||||
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
|
||||
CollectionName: collectionName,
|
||||
FieldName: integration.FloatVecField,
|
||||
IndexName: "_default",
|
||||
ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, metric.IP),
|
||||
})
|
||||
s.NoError(err)
|
||||
err = merr.Error(createIndexStatus)
|
||||
if err != nil {
|
||||
log.Warn("createIndexStatus fail reason", zap.Error(err))
|
||||
}
|
||||
|
||||
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
|
||||
|
||||
// load
|
||||
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
})
|
||||
s.NoError(err)
|
||||
err = merr.Error(loadStatus)
|
||||
if err != nil {
|
||||
log.Warn("LoadCollection fail reason", zap.Error(err))
|
||||
}
|
||||
s.WaitForLoad(ctx, collectionName)
|
||||
// search
|
||||
expr := fmt.Sprintf("%s > 0", integration.Int64Field)
|
||||
nq := 10
|
||||
topk := 10
|
||||
roundDecimal := -1
|
||||
|
||||
params := integration.GetSearchParams(integration.IndexFaissIvfFlat, "")
|
||||
searchReq := integration.ConstructSearchRequest("", collectionName, expr,
|
||||
integration.FloatVecField, schemapb.DataType_FloatVector, nil, metric.IP, params, nq, dim, topk, roundDecimal)
|
||||
|
||||
searchResult, _ := c.Proxy.Search(ctx, searchReq)
|
||||
|
||||
err = merr.Error(searchResult.GetStatus())
|
||||
if err != nil {
|
||||
log.Warn("searchResult fail reason", zap.Error(err))
|
||||
}
|
||||
s.NoError(err)
|
||||
|
||||
log.Info("===========================")
|
||||
log.Info("===========================")
|
||||
log.Info("TestUpsertAutoIDTrue succeed")
|
||||
log.Info("===========================")
|
||||
log.Info("===========================")
|
||||
}
|
||||
|
||||
func TestUpsert(t *testing.T) {
|
||||
|
|
|
@ -2066,6 +2066,7 @@ class TestUpsertInvalid(TestcaseBase):
|
|||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@pytest.mark.skip(reason="smellthemoon: behavior changed")
|
||||
def test_upsert_with_auto_id(self):
|
||||
"""
|
||||
target: test upsert with auto id
|
||||
|
|
|
@ -1112,6 +1112,7 @@ class TestPartitionOperations(TestcaseBase):
|
|||
partition_w.upsert(upsert_data, check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@pytest.mark.skip(reason="smellthemoon: behavior changed")
|
||||
def test_partition_upsert_with_auto_id(self):
|
||||
"""
|
||||
target: test upsert data in partition when auto_id=True
|
||||
|
|
Loading…
Reference in New Issue