fix: Fix cannot specify partition name in binlog import (#32730) (#32829)

issue: https://github.com/milvus-io/milvus/issues/32807

pr: https://github.com/milvus-io/milvus/pull/32730

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/32841/head
yihao.dai 2024-05-08 11:59:30 +08:00 committed by GitHub
parent 879733b47a
commit 94bd2f0d19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 32 additions and 15 deletions

View File

@ -5987,14 +5987,16 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
return resp, nil
}
isBackup := importutilv2.IsBackup(req.GetOptions())
hasPartitionKey := typeutil.HasPartitionKey(schema.CollectionSchema)
if req.GetPartitionName() != "" && hasPartitionKey {
resp.Status = merr.Status(merr.WrapErrImportFailed("not allow to set partition name for collection with partition key"))
return resp, nil
}
var partitionIDs []int64
if req.GetPartitionName() == "" && hasPartitionKey {
if isBackup {
if req.GetPartitionName() == "" {
resp.Status = merr.Status(merr.WrapErrParameterInvalidMsg("partition not specified"))
return resp, nil
}
// Currently, Backup tool call import must with a partition name, each time restore a partition
partitions, err := globalMetaCache.GetPartitions(ctx, req.GetDbName(), req.GetCollectionName())
if err != nil {
resp.Status = merr.Status(err)
@ -6002,17 +6004,30 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
}
partitionIDs = lo.Values(partitions)
} else {
partitionName := req.GetPartitionName()
if req.GetPartitionName() == "" {
partitionName = Params.CommonCfg.DefaultPartitionName.GetValue()
if hasPartitionKey {
if req.GetPartitionName() != "" {
resp.Status = merr.Status(merr.WrapErrImportFailed("not allow to set partition name for collection with partition key"))
return resp, nil
}
partitions, err := globalMetaCache.GetPartitions(ctx, req.GetDbName(), req.GetCollectionName())
if err != nil {
resp.Status = merr.Status(err)
return resp, nil
}
partitionIDs = lo.Values(partitions)
} else {
if req.GetPartitionName() == "" {
req.PartitionName = Params.CommonCfg.DefaultPartitionName.GetValue()
}
partitionID, err := globalMetaCache.GetPartitionID(ctx, req.GetDbName(), req.GetCollectionName(), req.PartitionName)
if err != nil {
resp.Status = merr.Status(err)
return resp, nil
}
partitionIDs = []UniqueID{partitionID}
}
partitionID, err := globalMetaCache.GetPartitionID(ctx, req.GetDbName(), req.GetCollectionName(), partitionName)
if err != nil {
resp.Status = merr.Status(err)
return resp, nil
}
partitionIDs = []UniqueID{partitionID}
}
req.Files = lo.Filter(req.GetFiles(), func(file *internalpb.ImportFile, _ int) bool {
return len(file.GetPaths()) > 0
})
@ -6025,7 +6040,6 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
Params.DataCoordCfg.MaxFilesPerImportReq.GetAsInt(), len(req.Files))))
return resp, nil
}
isBackup := importutilv2.IsBackup(req.GetOptions())
if !isBackup {
// check file type
for _, file := range req.GetFiles() {

View File

@ -199,6 +199,7 @@ func (s *BulkInsertSuite) TestBinlogImport() {
}
importResp, err := c.Proxy.ImportV2(ctx, &internalpb.ImportRequest{
CollectionName: collectionName,
PartitionName: paramtable.Get().CommonCfg.DefaultPartitionName.GetValue(),
Files: files,
Options: []*commonpb.KeyValuePair{
{Key: "startTs", Value: startTs},

View File

@ -799,6 +799,8 @@ class TestCreateImportJob(TestBase):
}
}
if is_partition_key:
payload["partitionName"] = "partition_0"
rsp = self.import_job_client.create_import_jobs(payload)
assert rsp['code'] == 200
# list import job