mirror of https://github.com/milvus-io/milvus.git
fix multi loadCollectition not work (#6097)
Signed-off-by: xige-16 <xi.ge@zilliz.com>pull/6102/head
parent
9b549e0fa4
commit
cea83dde5a
|
@ -159,8 +159,8 @@ func (c *queryNodeCluster) LoadSegments(ctx context.Context, nodeID int64, in *q
|
|||
status, err := node.client.LoadSegments(ctx, in)
|
||||
if err == nil && status.ErrorCode == commonpb.ErrorCode_Success {
|
||||
for _, info := range in.Infos {
|
||||
c.clusterMeta.addCollection(info.CollectionID, in.Schema)
|
||||
c.clusterMeta.addPartition(info.CollectionID, info.PartitionID)
|
||||
//c.clusterMeta.addCollection(info.CollectionID, in.Schema)
|
||||
//c.clusterMeta.addPartition(info.CollectionID, info.PartitionID)
|
||||
|
||||
node.addCollection(info.CollectionID, in.Schema)
|
||||
node.addPartition(info.CollectionID, info.PartitionID)
|
||||
|
@ -225,8 +225,8 @@ func (c *queryNodeCluster) WatchDmChannels(ctx context.Context, nodeID int64, in
|
|||
log.Debug("queryNode watch dm channel done")
|
||||
if err == nil && status.ErrorCode == commonpb.ErrorCode_Success {
|
||||
collectionID := in.CollectionID
|
||||
c.clusterMeta.addCollection(collectionID, in.Schema)
|
||||
c.clusterMeta.addDmChannel(collectionID, nodeID, channels)
|
||||
//c.clusterMeta.addCollection(collectionID, in.Schema)
|
||||
//c.clusterMeta.addDmChannel(collectionID, nodeID, channels)
|
||||
|
||||
node.addCollection(collectionID, in.Schema)
|
||||
node.addDmChannel(collectionID, channels)
|
||||
|
|
|
@ -108,15 +108,6 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
|
|||
return status, err
|
||||
}
|
||||
|
||||
hasCollection := qc.meta.hasCollection(collectionID)
|
||||
if hasCollection {
|
||||
loadCollection, _ := qc.meta.getLoadCollection(collectionID)
|
||||
if loadCollection {
|
||||
status.Reason = "collection has been loaded"
|
||||
return status, nil
|
||||
}
|
||||
}
|
||||
|
||||
loadCollectionTask := &LoadCollectionTask{
|
||||
BaseTask: BaseTask{
|
||||
ctx: qc.loopCtx,
|
||||
|
|
|
@ -137,6 +137,7 @@ func (m *meta) showPartitions(collectionID UniqueID) ([]UniqueID, error) {
|
|||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
//TODO::should update after load collection
|
||||
if info, ok := m.collectionInfos[collectionID]; ok {
|
||||
return info.PartitionIDs, nil
|
||||
}
|
||||
|
|
|
@ -164,8 +164,6 @@ func (lct *LoadCollectionTask) Execute(ctx context.Context) error {
|
|||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
}
|
||||
|
||||
lct.meta.addCollection(collectionID, lct.Schema)
|
||||
lct.meta.setLoadCollection(collectionID, true)
|
||||
showPartitionRequest := &milvuspb.ShowPartitionsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_ShowPartitions,
|
||||
|
@ -178,14 +176,38 @@ func (lct *LoadCollectionTask) Execute(ctx context.Context) error {
|
|||
lct.result = status
|
||||
return err
|
||||
}
|
||||
log.Debug("loadCollectionTask: get recovery info", zap.Int64s("partitionIDs", showPartitionResponse.PartitionIDs))
|
||||
log.Debug("loadCollectionTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", showPartitionResponse.PartitionIDs))
|
||||
|
||||
partitionIDs := showPartitionResponse.PartitionIDs
|
||||
toLoadPartitionIDs := make([]UniqueID, 0)
|
||||
for _, id := range showPartitionResponse.PartitionIDs {
|
||||
if !lct.meta.hasPartition(collectionID, id) {
|
||||
toLoadPartitionIDs = append(toLoadPartitionIDs, id)
|
||||
hasCollection := lct.meta.hasCollection(collectionID)
|
||||
if hasCollection {
|
||||
loadCollection, _ := lct.meta.getLoadCollection(collectionID)
|
||||
if loadCollection {
|
||||
for _, partitionID := range partitionIDs {
|
||||
hasReleasePartition := lct.meta.hasReleasePartition(collectionID, partitionID)
|
||||
if hasReleasePartition {
|
||||
toLoadPartitionIDs = append(toLoadPartitionIDs, partitionID)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for _, partitionID := range partitionIDs {
|
||||
hasPartition := lct.meta.hasPartition(collectionID, partitionID)
|
||||
if !hasPartition {
|
||||
toLoadPartitionIDs = append(toLoadPartitionIDs, partitionID)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
toLoadPartitionIDs = partitionIDs
|
||||
}
|
||||
|
||||
log.Debug("loadCollectionTask: toLoadPartitionIDs", zap.Int64s("partitionIDs", toLoadPartitionIDs))
|
||||
lct.meta.addCollection(collectionID, lct.Schema)
|
||||
lct.meta.setLoadCollection(collectionID, true)
|
||||
for _, id := range toLoadPartitionIDs {
|
||||
lct.meta.addPartition(collectionID, id)
|
||||
}
|
||||
|
||||
segment2Binlog := make(map[UniqueID]*querypb.SegmentLoadInfo)
|
||||
watchRequests := make(map[string]*querypb.WatchDmChannelsRequest)
|
||||
|
|
Loading…
Reference in New Issue