From a4b3a53123078c83c9ab73b4ace158f8af8e76be Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 7 Nov 2022 09:45:01 +0800 Subject: [PATCH] Remove KeyLock from querynode task (#20344) (#20349) Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia --- internal/querynode/impl.go | 36 -------------------------------- internal/querynode/mock_test.go | 2 -- internal/querynode/query_node.go | 5 ----- 3 files changed, 43 deletions(-) diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 3bcca33e0e..f778b44cc7 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -315,8 +315,6 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *querypb.WatchDmC zap.String("channelName", in.Infos[0].GetChannelName()), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID())) // currently we only support load one channel as a time - node.taskLock.RLock(strconv.FormatInt(in.Infos[0].CollectionID, 10)) - defer node.taskLock.RUnlock(strconv.FormatInt(in.Infos[0].CollectionID, 10)) future := node.taskPool.Submit(func() (interface{}, error) { log.Info("watchDmChannels start ", zap.Int64("collectionID", in.CollectionID), zap.String("channelName", in.Infos[0].GetChannelName()), @@ -397,8 +395,6 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC node: node, } - node.taskLock.Lock(strconv.FormatInt(dct.req.CollectionID, 10)) - defer node.taskLock.Unlock(strconv.FormatInt(dct.req.CollectionID, 10)) err := node.scheduler.queue.Enqueue(dct) if err != nil { status := &commonpb.Status{ @@ -472,19 +468,6 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *querypb.LoadSegment zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID())) - node.taskLock.RLock(strconv.FormatInt(in.CollectionID, 10)) - for _, segmentID := range segmentIDs { - node.taskLock.Lock(strconv.FormatInt(segmentID, 10)) - } - - // release all task locks - defer func() { - node.taskLock.RUnlock(strconv.FormatInt(in.CollectionID, 10)) - for _, id := range segmentIDs { - node.taskLock.Unlock(strconv.FormatInt(id, 10)) - } - }() - // TODO remove concurrent load segment for now, unless we solve the memory issue log.Info("loadSegmentsTask start ", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), @@ -540,8 +523,6 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *querypb.Releas node: node, } - node.taskLock.Lock(strconv.FormatInt(dct.req.CollectionID, 10)) - defer node.taskLock.Unlock(strconv.FormatInt(dct.req.CollectionID, 10)) err := node.scheduler.queue.Enqueue(dct) if err != nil { status := &commonpb.Status{ @@ -588,8 +569,6 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *querypb.Releas node: node, } - node.taskLock.Lock(strconv.FormatInt(dct.req.CollectionID, 10)) - defer node.taskLock.Unlock(strconv.FormatInt(dct.req.CollectionID, 10)) err := node.scheduler.queue.Enqueue(dct) if err != nil { status := &commonpb.Status{ @@ -642,22 +621,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *querypb.ReleaseS } log.Info("start to release segments", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs)) - node.taskLock.RLock(strconv.FormatInt(in.CollectionID, 10)) - sort.SliceStable(in.SegmentIDs, func(i, j int) bool { - return in.SegmentIDs[i] < in.SegmentIDs[j] - }) - for _, segmentID := range in.SegmentIDs { - node.taskLock.Lock(strconv.FormatInt(segmentID, 10)) - } - - // release all task locks - defer func() { - node.taskLock.RUnlock(strconv.FormatInt(in.CollectionID, 10)) - for _, id := range in.SegmentIDs { - node.taskLock.Unlock(strconv.FormatInt(id, 10)) - } - }() for _, id := range in.SegmentIDs { switch in.GetScope() { case querypb.DataScope_Streaming: diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 9c2abf41f5..69f69f657b 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -49,7 +49,6 @@ import ( "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/indexcgowrapper" - "github.com/milvus-io/milvus/internal/util/lock" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/panjf2000/ants/v2" ) @@ -1651,7 +1650,6 @@ func genSimpleQueryNodeWithMQFactory(ctx context.Context, fac dependency.Factory log.Error("QueryNode init channel pool failed", zap.Error(err)) return nil, err } - node.taskLock = lock.NewKeyLock() etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) node.etcdKV = etcdKV diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 5c5d7bb0dd..e5f20fbc81 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -57,7 +57,6 @@ import ( "github.com/milvus-io/milvus/internal/util/gc" "github.com/milvus-io/milvus/internal/util/hardware" "github.com/milvus-io/milvus/internal/util/initcore" - "github.com/milvus-io/milvus/internal/util/lock" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -126,8 +125,6 @@ type QueryNode struct { cgoPool *concurrency.Pool // pool for load/release channel taskPool *concurrency.Pool - // lock to avoid same chanel/channel run multiple times - taskLock *lock.KeyLock } // NewQueryNode will return a QueryNode with abnormal state. @@ -279,8 +276,6 @@ func (node *QueryNode) Init() error { return } - node.taskLock = lock.NewKeyLock() - // ensure every cgopool go routine is locked with a OS thread // so openmp in knowhere won't create too much request sig := make(chan struct{})