Fix error of repeated loading partition failures

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/4973/head^2
xige-16 2021-03-04 19:21:44 +08:00 committed by yefu.chen
parent 6dc938e6f4
commit ebaa5d4f5b
1 changed files with 28 additions and 12 deletions

View File

@ -244,20 +244,18 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol
}
log.Debug("load collection start", zap.String("collectionID", fmt.Sprintln(collectionID)))
_, err := qs.replica.getCollectionByID(dbID, collectionID)
if err == nil {
log.Error("load collection end, collection already exist", zap.String("collectionID", fmt.Sprintln(collectionID)))
return fn(nil), nil
}
err = qs.replica.addCollection(dbID, collectionID, schema)
if err != nil {
return fn(err), err
}
err = qs.replica.addCollection(dbID, collectionID, schema)
if err != nil {
return fn(err), err
}
err = qs.watchDmChannels(dbID, collectionID)
if err != nil {
return fn(err), err
err = qs.watchDmChannels(dbID, collectionID)
if err != nil {
return fn(err), err
}
}
// get partitionIDs
@ -277,11 +275,29 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol
}
partitionIDs := showPartitionResponse.PartitionIDs
partitionIDsToLoad := make([]UniqueID, 0)
partitionsInReplica, err := qs.replica.getPartitions(dbID, collectionID)
if err != nil {
return fn(err), err
}
for _, id := range partitionIDs {
cached := false
for _, partition := range partitionsInReplica {
if id == partition.id {
cached = true
break
}
}
if !cached {
partitionIDsToLoad = append(partitionIDsToLoad, id)
}
}
loadPartitionsRequest := &querypb.LoadPartitionRequest{
Base: req.Base,
DbID: dbID,
CollectionID: collectionID,
PartitionIDs: partitionIDs,
PartitionIDs: partitionIDsToLoad,
Schema: schema,
}