fix: Use pk from binlog during import (#32118)

During binlog import, even if the primary key's autoID is set to true,
the primary key from the binlog should be used instead of being
reassigned.

issue: https://github.com/milvus-io/milvus/discussions/31943,
https://github.com/milvus-io/milvus/issues/28521

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/32307/head
yihao.dai 2024-04-16 14:51:20 +08:00 committed by GitHub
parent e19d17076f
commit 558feed5ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 58 additions and 3 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -161,6 +162,11 @@ func NewPreImportTask(req *datapb.PreImportRequest) Task {
}
})
ctx, cancel := context.WithCancel(context.Background())
// During binlog import, even if the primary key's autoID is set to true,
// the primary key from the binlog should be used instead of being reassigned.
if importutilv2.IsBackup(req.GetOptions()) {
UnsetAutoID(req.GetSchema())
}
return &PreImportTask{
PreImportTask: &datapb.PreImportTask{
JobID: req.GetJobID(),
@ -230,6 +236,11 @@ type ImportTask struct {
func NewImportTask(req *datapb.ImportRequest) Task {
ctx, cancel := context.WithCancel(context.Background())
// During binlog import, even if the primary key's autoID is set to true,
// the primary key from the binlog should be used instead of being reassigned.
if importutilv2.IsBackup(req.GetOptions()) {
UnsetAutoID(req.GetSchema())
}
task := &ImportTask{
ImportTaskV2: &datapb.ImportTaskV2{
JobID: req.GetJobID(),

View File

@ -217,3 +217,12 @@ func LogStats(manager TaskManager) {
tasks = manager.GetBy(WithType(ImportTaskType))
logFunc(tasks, ImportTaskType)
}
func UnsetAutoID(schema *schemapb.CollectionSchema) {
for _, field := range schema.GetFields() {
if field.GetIsPrimaryKey() && field.GetAutoID() {
field.AutoID = false
return
}
}
}

View File

@ -89,3 +89,27 @@ func Test_AppendSystemFieldsData(t *testing.T) {
assert.Equal(t, count, insertData.Data[common.RowIDField].RowNum())
assert.Equal(t, count, insertData.Data[common.TimeStampField].RowNum())
}
func Test_UnsetAutoID(t *testing.T) {
pkField := &schemapb.FieldSchema{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
AutoID: true,
}
vecField := &schemapb.FieldSchema{
FieldID: 101,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
}
schema := &schemapb.CollectionSchema{}
schema.Fields = []*schemapb.FieldSchema{pkField, vecField}
UnsetAutoID(schema)
for _, field := range schema.GetFields() {
if field.GetIsPrimaryKey() {
assert.False(t, schema.GetFields()[0].GetAutoID())
}
}
}

View File

@ -22,6 +22,7 @@ import (
"time"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -37,7 +38,7 @@ import (
"github.com/milvus-io/milvus/tests/integration"
)
func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64) {
func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64, *schemapb.IDs) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := s.Cluster
@ -86,6 +87,7 @@ func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64) {
})
s.NoError(err)
s.Equal(int32(0), insertResult.GetStatus().GetCode())
insertedIDs := insertResult.GetIDs()
// flush
flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
@ -148,7 +150,7 @@ func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64) {
// get collectionID and partitionID
collectionID := showCollectionsResp.GetCollectionIds()[0]
partitionID := showPartitionsResp.GetPartitionIDs()[0]
return collectionID, partitionID
return collectionID, partitionID, insertedIDs
}
func (s *BulkInsertSuite) TestBinlogImport() {
@ -157,7 +159,7 @@ func (s *BulkInsertSuite) TestBinlogImport() {
endTs = "548373346338803234"
)
collectionID, partitionID := s.PrepareCollectionA()
collectionID, partitionID, insertedIDs := s.PrepareCollectionA()
c := s.Cluster
ctx, cancel := context.WithTimeout(c.GetContext(), 60*time.Second)
@ -252,4 +254,13 @@ func (s *BulkInsertSuite) TestBinlogImport() {
err = merr.CheckRPCCall(searchResult, err)
s.NoError(err)
s.Equal(nq*topk, len(searchResult.GetResults().GetScores()))
// check ids from collectionA, because during binlog import, even if the primary key's autoID is set to true,
// the primary key from the binlog should be used instead of being reassigned.
insertedIDsMap := lo.SliceToMap(insertedIDs.GetIntId().GetData(), func(id int64) (int64, struct{}) {
return id, struct{}{}
})
for _, id := range searchResult.GetResults().GetIds().GetIntId().GetData() {
_, ok := insertedIDsMap[id]
s.True(ok)
}
}