mirror of https://github.com/milvus-io/milvus.git
Fix deadlock in release replica (#16925)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/16930/head^2
parent
3579ae8240
commit
a8b81e215b
|
@ -577,20 +577,9 @@ func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) (
|
|||
}, nil
|
||||
}
|
||||
|
||||
if !node.queryShardService.hasQueryShard(req.GetDmlChannel()) {
|
||||
err := node.queryShardService.addQueryShard(req.Req.CollectionID, req.GetDmlChannel(), 0) // TODO: add replicaID in request or remove it in query shard
|
||||
if err != nil {
|
||||
return &internalpb.SearchResults{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
qs, err := node.queryShardService.getQueryShard(req.GetDmlChannel())
|
||||
if err != nil {
|
||||
log.Warn("Search failed, failed to get query shard", zap.String("dml channel", req.GetDmlChannel()), zap.Error(err))
|
||||
return &internalpb.SearchResults{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -635,20 +624,9 @@ func (node *QueryNode) Query(ctx context.Context, req *queryPb.QueryRequest) (*i
|
|||
}, nil
|
||||
}
|
||||
|
||||
if !node.queryShardService.hasQueryShard(req.GetDmlChannel()) {
|
||||
err := node.queryShardService.addQueryShard(req.Req.CollectionID, req.GetDmlChannel(), 0) // TODO: add replicaID in request or remove it in query shard
|
||||
if err != nil {
|
||||
return &internalpb.RetrieveResults{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
qs, err := node.queryShardService.getQueryShard(req.GetDmlChannel())
|
||||
if err != nil {
|
||||
log.Warn("Query failed, failed to get query shard", zap.String("dml channel", req.GetDmlChannel()), zap.Error(err))
|
||||
return &internalpb.RetrieveResults{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
|
|
@ -702,8 +702,6 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
|
|||
zap.Any("collectionID", r.req.CollectionID),
|
||||
)
|
||||
|
||||
r.node.queryShardService.releaseCollection(r.req.CollectionID)
|
||||
|
||||
err := r.releaseReplica(r.node.streaming.replica, replicaStreaming)
|
||||
if err != nil {
|
||||
return fmt.Errorf("release collection failed, collectionID = %d, err = %s", r.req.CollectionID, err)
|
||||
|
@ -718,6 +716,8 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
|
|||
|
||||
debug.FreeOSMemory()
|
||||
|
||||
r.node.queryShardService.releaseCollection(r.req.CollectionID)
|
||||
|
||||
log.Info("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID))
|
||||
return nil
|
||||
}
|
||||
|
@ -728,6 +728,7 @@ func (r *releaseCollectionTask) releaseReplica(replica ReplicaInterface, replica
|
|||
|
||||
collection, err := replica.getCollectionByID(r.req.CollectionID)
|
||||
if err != nil {
|
||||
replica.queryUnlock()
|
||||
return err
|
||||
}
|
||||
// set release time
|
||||
|
|
Loading…
Reference in New Issue