mirror of https://github.com/milvus-io/milvus.git
enhance: Reduce bloom filter lock contention between insert and delete in query coord (#32643) (#33284)
issue: #32530 pr: #32643 cause ProcessDelete need to check whether pk exist in bloom filter, and ProcessInsert need to update pk to bloom filter, when execute ProcessInsert and ProcessDelete in parallel, it will cause race condition in segment's bloom filter This PR execute ProcessInsert and ProcessDelete in serial to avoid block each other Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/33291/head
parent
3c31499bbc
commit
a988e7cabc
|
@ -456,7 +456,7 @@ func (node *QueryNode) Stop() error {
|
|||
case <-time.After(time.Second):
|
||||
metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(node.GetNodeID())).Set(float64(len(sealedSegments)))
|
||||
metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(node.GetNodeID())).Set(float64(channelNum))
|
||||
log.Info("migrate data...", zap.Int64("ServerID", paramtable.GetNodeID()),
|
||||
log.Info("migrate data...", zap.Int64("ServerID", node.GetNodeID()),
|
||||
zap.Int64s("sealedSegments", lo.Map(sealedSegments, func(s segments.Segment, i int) int64 {
|
||||
return s.ID()
|
||||
})),
|
||||
|
|
|
@ -17,12 +17,6 @@
|
|||
package pipeline
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
)
|
||||
|
||||
|
@ -36,62 +30,15 @@ type Node interface {
|
|||
|
||||
type nodeCtx struct {
|
||||
node Node
|
||||
|
||||
inputChannel chan Msg
|
||||
|
||||
next *nodeCtx
|
||||
checker *timerecord.GroupChecker
|
||||
|
||||
closeCh chan struct{} // notify work to exit
|
||||
closeWg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (c *nodeCtx) Start() {
|
||||
c.closeWg.Add(1)
|
||||
c.node.Start()
|
||||
go c.work()
|
||||
}
|
||||
|
||||
func (c *nodeCtx) Close() {
|
||||
close(c.closeCh)
|
||||
c.closeWg.Wait()
|
||||
}
|
||||
|
||||
func (c *nodeCtx) work() {
|
||||
defer c.closeWg.Done()
|
||||
name := fmt.Sprintf("nodeCtxTtChecker-%s", c.node.Name())
|
||||
if c.checker != nil {
|
||||
c.checker.Check(name)
|
||||
defer c.checker.Remove(name)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
// close
|
||||
case <-c.closeCh:
|
||||
c.node.Close()
|
||||
close(c.inputChannel)
|
||||
log.Debug("pipeline node closed", zap.String("nodeName", c.node.Name()))
|
||||
return
|
||||
case input := <-c.inputChannel:
|
||||
var output Msg
|
||||
output = c.node.Operate(input)
|
||||
if c.checker != nil {
|
||||
c.checker.Check(name)
|
||||
}
|
||||
if c.next != nil && output != nil {
|
||||
c.next.inputChannel <- output
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newNodeCtx(node Node) *nodeCtx {
|
||||
return &nodeCtx{
|
||||
node: node,
|
||||
inputChannel: make(chan Msg, node.MaxQueueLength()),
|
||||
closeCh: make(chan struct{}),
|
||||
closeWg: sync.WaitGroup{},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package pipeline
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -36,6 +37,8 @@ type pipeline struct {
|
|||
inputChannel chan Msg
|
||||
nodeTtInterval time.Duration
|
||||
enableTtChecker bool
|
||||
|
||||
checkerNames map[string]string
|
||||
}
|
||||
|
||||
func (p *pipeline) Add(nodes ...Node) {
|
||||
|
@ -50,6 +53,10 @@ func (p *pipeline) addNode(node Node) {
|
|||
nodeCtx.checker = timerecord.GetGroupChecker("fgNode", p.nodeTtInterval, func(list []string) {
|
||||
log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", p.nodeTtInterval))
|
||||
})
|
||||
if p.checkerNames == nil {
|
||||
p.checkerNames = make(map[string]string)
|
||||
}
|
||||
p.checkerNames[nodeCtx.node.Name()] = fmt.Sprintf("nodeCtxTtChecker-%s", nodeCtx.node.Name())
|
||||
}
|
||||
|
||||
if len(p.nodes) != 0 {
|
||||
|
@ -62,17 +69,31 @@ func (p *pipeline) addNode(node Node) {
|
|||
}
|
||||
|
||||
func (p *pipeline) Start() error {
|
||||
if len(p.nodes) == 0 {
|
||||
return ErrEmptyPipeline
|
||||
}
|
||||
for _, node := range p.nodes {
|
||||
node.Start()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pipeline) Close() {
|
||||
for _, node := range p.nodes {
|
||||
node.Close()
|
||||
}
|
||||
|
||||
func (p *pipeline) process() {
|
||||
if len(p.nodes) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
curNode := p.nodes[0]
|
||||
for curNode != nil {
|
||||
if len(curNode.inputChannel) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
input := <-curNode.inputChannel
|
||||
output := curNode.node.Operate(input)
|
||||
if _, ok := p.checkerNames[curNode.node.Name()]; ok {
|
||||
curNode.checker.Check(p.checkerNames[curNode.node.Name()])
|
||||
}
|
||||
if curNode.next != nil && output != nil {
|
||||
curNode.next.inputChannel <- output
|
||||
}
|
||||
curNode = curNode.next
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,8 +31,9 @@ type testNode struct {
|
|||
|
||||
func (t *testNode) Operate(in Msg) Msg {
|
||||
msg := in.(*msgstream.MsgPack)
|
||||
msg.BeginTs++
|
||||
if t.outChannel != nil {
|
||||
t.outChannel <- msg.BeginTs
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
|
@ -43,7 +44,7 @@ type PipelineSuite struct {
|
|||
}
|
||||
|
||||
func (suite *PipelineSuite) SetupTest() {
|
||||
suite.outChannel = make(chan msgstream.Timestamp)
|
||||
suite.outChannel = make(chan msgstream.Timestamp, 1)
|
||||
suite.pipeline = &pipeline{
|
||||
nodes: []*nodeCtx{},
|
||||
nodeTtInterval: 0,
|
||||
|
@ -52,7 +53,21 @@ func (suite *PipelineSuite) SetupTest() {
|
|||
|
||||
suite.pipeline.addNode(&testNode{
|
||||
BaseNode: &BaseNode{
|
||||
name: "test-node",
|
||||
name: "test-node1",
|
||||
maxQueueLength: 8,
|
||||
},
|
||||
})
|
||||
|
||||
suite.pipeline.addNode(&testNode{
|
||||
BaseNode: &BaseNode{
|
||||
name: "test-node2",
|
||||
maxQueueLength: 8,
|
||||
},
|
||||
})
|
||||
|
||||
suite.pipeline.addNode(&testNode{
|
||||
BaseNode: &BaseNode{
|
||||
name: "test-node3",
|
||||
maxQueueLength: 8,
|
||||
},
|
||||
outChannel: suite.outChannel,
|
||||
|
@ -62,10 +77,13 @@ func (suite *PipelineSuite) SetupTest() {
|
|||
func (suite *PipelineSuite) TestBasic() {
|
||||
suite.pipeline.Start()
|
||||
defer suite.pipeline.Close()
|
||||
suite.pipeline.inputChannel <- &msgstream.MsgPack{}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
suite.pipeline.inputChannel <- &msgstream.MsgPack{BeginTs: msgstream.Timestamp(i)}
|
||||
suite.pipeline.process()
|
||||
output := <-suite.outChannel
|
||||
suite.Equal(msgstream.Timestamp(1), output)
|
||||
suite.Equal(i, int(output))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPipeline(t *testing.T) {
|
||||
|
|
|
@ -37,7 +37,7 @@ type StreamPipeline interface {
|
|||
}
|
||||
|
||||
type streamPipeline struct {
|
||||
*pipeline
|
||||
pipeline *pipeline
|
||||
input <-chan *msgstream.MsgPack
|
||||
dispatcher msgdispatcher.Client
|
||||
startOnce sync.Once
|
||||
|
@ -57,7 +57,8 @@ func (p *streamPipeline) work() {
|
|||
return
|
||||
case msg := <-p.input:
|
||||
log.RatedDebug(10, "stream pipeline fetch msg", zap.Int("sum", len(msg.Msgs)))
|
||||
p.nodes[0].inputChannel <- msg
|
||||
p.pipeline.inputChannel <- msg
|
||||
p.pipeline.process()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -86,6 +87,10 @@ func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (p *streamPipeline) Add(nodes ...Node) {
|
||||
p.pipeline.Add(nodes...)
|
||||
}
|
||||
|
||||
func (p *streamPipeline) Start() error {
|
||||
var err error
|
||||
p.startOnce.Do(func() {
|
||||
|
|
|
@ -68,11 +68,11 @@ func (suite *StreamPipelineSuite) TestBasic() {
|
|||
|
||||
suite.pipeline.Start()
|
||||
defer suite.pipeline.Close()
|
||||
suite.inChannel <- &msgstream.MsgPack{}
|
||||
suite.inChannel <- &msgstream.MsgPack{BeginTs: 1001}
|
||||
|
||||
for i := 1; i <= suite.length; i++ {
|
||||
output := <-suite.outChannel
|
||||
suite.Equal(msgstream.Timestamp(i), output)
|
||||
suite.Equal(int64(1001), int64(output))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue