enhance: support varchar autoid when bulkinsert (#30377)

support varchar autoid when bulkinsert

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
pull/30402/head^2
smellthemoon 2024-02-01 19:45:09 +08:00 committed by GitHub
parent e22e8b30d4
commit 6bc10f9fdd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 70 additions and 13 deletions

View File

@ -1038,20 +1038,30 @@ func splitFieldsData(collectionInfo *CollectionInfo, fieldsData BlockData, shard
autoIDRange := make([]int64, 0)
if primaryKey.GetAutoID() {
log.Info("generating auto-id", zap.Int("rowCount", rowCount), zap.Int64("rowIDBegin", rowIDBegin))
if primaryKey.GetDataType() != schemapb.DataType_Int64 {
log.Warn("primary key field is auto-generated but the field type is not int64")
return nil, fmt.Errorf("primary key field is auto-generated but the field type is not int64")
}
if primaryKey.GetDataType() == schemapb.DataType_Int64 {
primaryDataArr := &storage.Int64FieldData{
Data: make([]int64, 0, rowCount),
}
for i := rowIDBegin; i < rowIDEnd; i++ {
primaryDataArr.Data = append(primaryDataArr.Data, i)
}
primaryDataArr := &storage.Int64FieldData{
Data: make([]int64, 0, rowCount),
}
for i := rowIDBegin; i < rowIDEnd; i++ {
primaryDataArr.Data = append(primaryDataArr.Data, i)
}
fieldsData[primaryKey.GetFieldID()] = primaryDataArr
autoIDRange = append(autoIDRange, rowIDBegin, rowIDEnd)
} else if primaryKey.GetDataType() == schemapb.DataType_VarChar {
primaryDataArr := &storage.StringFieldData{
Data: make([]string, 0, rowCount),
}
for i := rowIDBegin; i < rowIDEnd; i++ {
primaryDataArr.Data = append(primaryDataArr.Data, strconv.FormatInt(i, 10))
}
fieldsData[primaryKey.GetFieldID()] = primaryDataArr
autoIDRange = append(autoIDRange, rowIDBegin, rowIDEnd)
fieldsData[primaryKey.GetFieldID()] = primaryDataArr
autoIDRange = append(autoIDRange, rowIDBegin, rowIDEnd)
} else {
log.Warn("unsupported primary key type", zap.Int("type", int(primaryKey.GetDataType())))
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("unsupported primary key type %d, primary key should be int64 or varchar", primaryKey.GetDataType()))
}
}
// if the primary key is not auto-gernerate and user doesn't provide, return error

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
@ -879,7 +880,7 @@ func Test_NumpyParserSplitFieldsData(t *testing.T) {
parser.rowIDAllocator = newIDAllocator(ctx, t, nil)
})
t.Run("primary key auto-generated", func(t *testing.T) {
t.Run("int64 primary key auto-generated", func(t *testing.T) {
parser.collectionInfo.resetSchema(createNumpySchema())
schema := findSchema(parser.collectionInfo.Schema, schemapb.DataType_Int64)
schema.AutoID = true
@ -906,6 +907,52 @@ func Test_NumpyParserSplitFieldsData(t *testing.T) {
schema.AutoID = false
})
t.Run("varchar primary key auto-generated", func(t *testing.T) {
parser.collectionInfo.resetSchema(createNumpySchema())
schema := findSchema(parser.collectionInfo.Schema, schemapb.DataType_Int64)
schema.IsPartitionKey = false
schema = findSchema(parser.collectionInfo.Schema, schemapb.DataType_VarChar)
schema.AutoID = true
parser.collectionInfo.PrimaryKey = schema
partitionID := int64(1)
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
shards := createShardsData(sampleSchema(), fieldsData, 2, []int64{partitionID})
segmentData := genFieldsDataFunc()
parser.autoIDRange, err = splitFieldsData(parser.collectionInfo, segmentData, shards, parser.rowIDAllocator)
assert.NoError(t, err)
assert.NotEmpty(t, parser.autoIDRange)
totalNum := 0
for i := 0; i < int(parser.collectionInfo.ShardNum); i++ {
totalNum += shards[i][partitionID][106].RowNum()
}
assert.Equal(t, segmentData[106].RowNum(), totalNum)
// target field data is nil
shards[0][partitionID][105] = nil
parser.autoIDRange, err = splitFieldsData(parser.collectionInfo, segmentData, shards, parser.rowIDAllocator)
assert.Error(t, err)
schema.AutoID = false
})
t.Run("not support primary key type auto-generated", func(t *testing.T) {
parser.collectionInfo.resetSchema(createNumpySchema())
schema := findSchema(parser.collectionInfo.Schema, schemapb.DataType_Int64)
schema.IsPartitionKey = false
schema = findSchema(parser.collectionInfo.Schema, schemapb.DataType_Bool)
schema.AutoID = true
parser.collectionInfo.PrimaryKey = schema
partitionID := int64(1)
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
shards := createShardsData(sampleSchema(), fieldsData, 2, []int64{partitionID})
segmentData := genFieldsDataFunc()
parser.autoIDRange, err = splitFieldsData(parser.collectionInfo, segmentData, shards, parser.rowIDAllocator)
assert.ErrorIs(t, err, merr.ErrParameterInvalid)
})
t.Run("has dynamic field", func(t *testing.T) {
schema := &schemapb.CollectionSchema{
Name: "schema",