mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/20417/head
parent
16c8642b18
commit
a4b3a53123
|
@ -315,8 +315,6 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *querypb.WatchDmC
|
|||
zap.String("channelName", in.Infos[0].GetChannelName()),
|
||||
zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
|
||||
// currently we only support load one channel as a time
|
||||
node.taskLock.RLock(strconv.FormatInt(in.Infos[0].CollectionID, 10))
|
||||
defer node.taskLock.RUnlock(strconv.FormatInt(in.Infos[0].CollectionID, 10))
|
||||
future := node.taskPool.Submit(func() (interface{}, error) {
|
||||
log.Info("watchDmChannels start ", zap.Int64("collectionID", in.CollectionID),
|
||||
zap.String("channelName", in.Infos[0].GetChannelName()),
|
||||
|
@ -397,8 +395,6 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
|
|||
node: node,
|
||||
}
|
||||
|
||||
node.taskLock.Lock(strconv.FormatInt(dct.req.CollectionID, 10))
|
||||
defer node.taskLock.Unlock(strconv.FormatInt(dct.req.CollectionID, 10))
|
||||
err := node.scheduler.queue.Enqueue(dct)
|
||||
if err != nil {
|
||||
status := &commonpb.Status{
|
||||
|
@ -472,19 +468,6 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *querypb.LoadSegment
|
|||
zap.Int64s("segmentIDs", segmentIDs),
|
||||
zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
|
||||
|
||||
node.taskLock.RLock(strconv.FormatInt(in.CollectionID, 10))
|
||||
for _, segmentID := range segmentIDs {
|
||||
node.taskLock.Lock(strconv.FormatInt(segmentID, 10))
|
||||
}
|
||||
|
||||
// release all task locks
|
||||
defer func() {
|
||||
node.taskLock.RUnlock(strconv.FormatInt(in.CollectionID, 10))
|
||||
for _, id := range segmentIDs {
|
||||
node.taskLock.Unlock(strconv.FormatInt(id, 10))
|
||||
}
|
||||
}()
|
||||
|
||||
// TODO remove concurrent load segment for now, unless we solve the memory issue
|
||||
log.Info("loadSegmentsTask start ", zap.Int64("collectionID", in.CollectionID),
|
||||
zap.Int64s("segmentIDs", segmentIDs),
|
||||
|
@ -540,8 +523,6 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *querypb.Releas
|
|||
node: node,
|
||||
}
|
||||
|
||||
node.taskLock.Lock(strconv.FormatInt(dct.req.CollectionID, 10))
|
||||
defer node.taskLock.Unlock(strconv.FormatInt(dct.req.CollectionID, 10))
|
||||
err := node.scheduler.queue.Enqueue(dct)
|
||||
if err != nil {
|
||||
status := &commonpb.Status{
|
||||
|
@ -588,8 +569,6 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *querypb.Releas
|
|||
node: node,
|
||||
}
|
||||
|
||||
node.taskLock.Lock(strconv.FormatInt(dct.req.CollectionID, 10))
|
||||
defer node.taskLock.Unlock(strconv.FormatInt(dct.req.CollectionID, 10))
|
||||
err := node.scheduler.queue.Enqueue(dct)
|
||||
if err != nil {
|
||||
status := &commonpb.Status{
|
||||
|
@ -642,22 +621,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *querypb.ReleaseS
|
|||
}
|
||||
|
||||
log.Info("start to release segments", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs))
|
||||
node.taskLock.RLock(strconv.FormatInt(in.CollectionID, 10))
|
||||
sort.SliceStable(in.SegmentIDs, func(i, j int) bool {
|
||||
return in.SegmentIDs[i] < in.SegmentIDs[j]
|
||||
})
|
||||
|
||||
for _, segmentID := range in.SegmentIDs {
|
||||
node.taskLock.Lock(strconv.FormatInt(segmentID, 10))
|
||||
}
|
||||
|
||||
// release all task locks
|
||||
defer func() {
|
||||
node.taskLock.RUnlock(strconv.FormatInt(in.CollectionID, 10))
|
||||
for _, id := range in.SegmentIDs {
|
||||
node.taskLock.Unlock(strconv.FormatInt(id, 10))
|
||||
}
|
||||
}()
|
||||
for _, id := range in.SegmentIDs {
|
||||
switch in.GetScope() {
|
||||
case querypb.DataScope_Streaming:
|
||||
|
|
|
@ -49,7 +49,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
|
||||
"github.com/milvus-io/milvus/internal/util/lock"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
)
|
||||
|
@ -1651,7 +1650,6 @@ func genSimpleQueryNodeWithMQFactory(ctx context.Context, fac dependency.Factory
|
|||
log.Error("QueryNode init channel pool failed", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
node.taskLock = lock.NewKeyLock()
|
||||
etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
|
||||
node.etcdKV = etcdKV
|
||||
|
||||
|
|
|
@ -57,7 +57,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/gc"
|
||||
"github.com/milvus-io/milvus/internal/util/hardware"
|
||||
"github.com/milvus-io/milvus/internal/util/initcore"
|
||||
"github.com/milvus-io/milvus/internal/util/lock"
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
|
@ -126,8 +125,6 @@ type QueryNode struct {
|
|||
cgoPool *concurrency.Pool
|
||||
// pool for load/release channel
|
||||
taskPool *concurrency.Pool
|
||||
// lock to avoid same chanel/channel run multiple times
|
||||
taskLock *lock.KeyLock
|
||||
}
|
||||
|
||||
// NewQueryNode will return a QueryNode with abnormal state.
|
||||
|
@ -279,8 +276,6 @@ func (node *QueryNode) Init() error {
|
|||
return
|
||||
}
|
||||
|
||||
node.taskLock = lock.NewKeyLock()
|
||||
|
||||
// ensure every cgopool go routine is locked with a OS thread
|
||||
// so openmp in knowhere won't create too much request
|
||||
sig := make(chan struct{})
|
||||
|
|
Loading…
Reference in New Issue