enhance: clean invalid pipline excluded segment info (#30429)

relate: https://github.com/milvus-io/milvus/issues/30281

---------

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/30924/head
aoiasd 2024-03-01 10:43:01 +08:00 committed by GitHub
parent f3d1c75499
commit 3633923bb7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 93 additions and 27 deletions

View File

@ -30,7 +30,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
// filterNode filter the invalid message of pipeline // filterNode filter the invalid message of pipeline
@ -38,7 +37,7 @@ type filterNode struct {
*BaseNode *BaseNode
collectionID UniqueID collectionID UniqueID
manager *DataManager manager *DataManager
excludedSegments *typeutil.ConcurrentMap[int64, uint64] excludedSegments *ExcludedSegments
channel string channel string
InsertMsgPolicys []InsertMsgFilter InsertMsgPolicys []InsertMsgFilter
DeleteMsgPolicys []DeleteMsgFilter DeleteMsgPolicys []DeleteMsgFilter
@ -96,7 +95,9 @@ func (fNode *filterNode) Operate(in Msg) Msg {
out.append(msg) out.append(msg)
} }
} }
if fNode.excludedSegments.ShouldClean() {
fNode.excludedSegments.CleanInvalid(streamMsgPack.EndTs)
}
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Inc() metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Inc()
return out return out
} }
@ -133,7 +134,7 @@ func newFilterNode(
collectionID int64, collectionID int64,
channel string, channel string,
manager *DataManager, manager *DataManager,
excludedSegments *typeutil.ConcurrentMap[int64, uint64], excludedSegments *ExcludedSegments,
maxQueueLength int32, maxQueueLength int32,
) *filterNode { ) *filterNode {
return &filterNode{ return &filterNode{

View File

@ -18,6 +18,7 @@ package pipeline
import ( import (
"testing" "testing"
"time"
"github.com/samber/lo" "github.com/samber/lo"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -26,7 +27,6 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
// test of filter node // test of filter node
@ -38,7 +38,7 @@ type FilterNodeSuite struct {
channel string channel string
validSegmentIDs []int64 validSegmentIDs []int64
excludedSegments *typeutil.ConcurrentMap[int64, uint64] excludedSegments *ExcludedSegments
excludedSegmentIDs []int64 excludedSegmentIDs []int64
insertSegmentIDs []int64 insertSegmentIDs []int64
deleteSegmentSum int deleteSegmentSum int
@ -62,10 +62,12 @@ func (suite *FilterNodeSuite) SetupSuite() {
suite.errSegmentID = 7 suite.errSegmentID = 7
// init excludedSegment // init excludedSegment
suite.excludedSegments = typeutil.NewConcurrentMap[int64, uint64]() suite.excludedSegments = NewExcludedSegments(0 * time.Second)
excludeInfo := map[int64]uint64{}
for _, id := range suite.excludedSegmentIDs { for _, id := range suite.excludedSegmentIDs {
suite.excludedSegments.Insert(id, 1) excludeInfo[id] = 1
} }
suite.excludedSegments.Insert(excludeInfo)
} }
// test filter node with collection load collection // test filter node with collection load collection

View File

@ -18,7 +18,13 @@ package pipeline
import ( import (
"fmt" "fmt"
"sync"
"time"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
) )
@ -54,11 +60,8 @@ func InsertOutOfTarget(n *filterNode, c *Collection, msg *InsertMsg) error {
} }
func InsertExcluded(n *filterNode, c *Collection, msg *InsertMsg) error { func InsertExcluded(n *filterNode, c *Collection, msg *InsertMsg) error {
ts, ok := n.excludedSegments.Get(msg.SegmentID) ok := n.excludedSegments.Verify(msg.SegmentID, msg.EndTimestamp)
if !ok { if !ok {
return nil
}
if msg.EndTimestamp <= ts {
m := fmt.Sprintf("Segment excluded, id: %d", msg.GetSegmentID()) m := fmt.Sprintf("Segment excluded, id: %d", msg.GetSegmentID())
return merr.WrapErrSegmentLack(msg.GetSegmentID(), m) return merr.WrapErrSegmentLack(msg.GetSegmentID(), m)
} }
@ -88,3 +91,60 @@ func DeleteOutOfTarget(n *filterNode, c *Collection, msg *DeleteMsg) error {
// all growing will be be in-memory to support dynamic partition load/release // all growing will be be in-memory to support dynamic partition load/release
return nil return nil
} }
type ExcludedSegments struct {
mu sync.RWMutex
segments map[int64]uint64 // segmentID -> Excluded TS
lastClean atomic.Time
cleanInterval time.Duration
}
func NewExcludedSegments(cleanInterval time.Duration) *ExcludedSegments {
return &ExcludedSegments{
segments: make(map[int64]uint64),
cleanInterval: cleanInterval,
}
}
func (s *ExcludedSegments) Insert(excludeInfo map[int64]uint64) {
s.mu.Lock()
defer s.mu.Unlock()
for segmentID, ts := range excludeInfo {
log.Debug("add exclude info",
zap.Int64("segmentID", segmentID),
zap.Uint64("ts", ts),
)
s.segments[segmentID] = ts
}
}
func (s *ExcludedSegments) Verify(segmentID int64, ts uint64) bool {
s.mu.RLock()
defer s.mu.RUnlock()
if excludeTs, ok := s.segments[segmentID]; ok && ts <= excludeTs {
return false
}
return true
}
func (s *ExcludedSegments) CleanInvalid(ts uint64) {
s.mu.Lock()
defer s.mu.Unlock()
invalidExcludedInfos := []int64{}
for segmentsID, excludeTs := range s.segments {
if excludeTs < ts {
invalidExcludedInfos = append(invalidExcludedInfos, segmentsID)
}
}
for _, segmentID := range invalidExcludedInfos {
delete(s.segments, segmentID)
}
s.lastClean.Store(time.Now())
}
func (s *ExcludedSegments) ShouldClean() bool {
return time.Since(s.lastClean.Load()) > s.cleanInterval
}

View File

@ -17,15 +17,13 @@
package pipeline package pipeline
import ( import (
"go.uber.org/zap" "time"
"github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/delegator"
base "github.com/milvus-io/milvus/internal/util/pipeline" base "github.com/milvus-io/milvus/internal/util/pipeline"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
// pipeline used for querynode // pipeline used for querynode
@ -37,18 +35,12 @@ type Pipeline interface {
type pipeline struct { type pipeline struct {
base.StreamPipeline base.StreamPipeline
excludedSegments *typeutil.ConcurrentMap[int64, uint64] excludedSegments *ExcludedSegments
collectionID UniqueID collectionID UniqueID
} }
func (p *pipeline) ExcludedSegments(excludeInfo map[int64]uint64) { //(segInfos ...*datapb.SegmentInfo) { func (p *pipeline) ExcludedSegments(excludeInfo map[int64]uint64) { //(segInfos ...*datapb.SegmentInfo) {
for segmentID, ts := range excludeInfo { p.excludedSegments.Insert(excludeInfo)
log.Debug("pipeline add exclude info",
zap.Int64("segmentID", segmentID),
zap.Uint64("ts", ts),
)
p.excludedSegments.Insert(segmentID, ts)
}
} }
func (p *pipeline) Close() { func (p *pipeline) Close() {
@ -65,7 +57,7 @@ func NewPipeLine(
delegator delegator.ShardDelegator, delegator delegator.ShardDelegator,
) (Pipeline, error) { ) (Pipeline, error) {
pipelineQueueLength := paramtable.Get().QueryNodeCfg.FlowGraphMaxQueueLength.GetAsInt32() pipelineQueueLength := paramtable.Get().QueryNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()
excludedSegments := typeutil.NewConcurrentMap[int64, uint64]() excludedSegments := NewExcludedSegments(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.GetAsDuration(time.Second))
p := &pipeline{ p := &pipeline{
collectionID: collectionID, collectionID: collectionID,

View File

@ -1909,9 +1909,6 @@ func (p *queryCoordConfig) init(base *BaseTable) {
type queryNodeConfig struct { type queryNodeConfig struct {
SoPath ParamItem `refreshable:"false"` SoPath ParamItem `refreshable:"false"`
FlowGraphMaxQueueLength ParamItem `refreshable:"false"`
FlowGraphMaxParallelism ParamItem `refreshable:"false"`
// stats // stats
// Deprecated: Never used // Deprecated: Never used
StatsPublishInterval ParamItem `refreshable:"true"` StatsPublishInterval ParamItem `refreshable:"true"`
@ -1979,6 +1976,11 @@ type queryNodeConfig struct {
EnableWorkerSQCostMetrics ParamItem `refreshable:"true"` EnableWorkerSQCostMetrics ParamItem `refreshable:"true"`
ExprEvalBatchSize ParamItem `refreshable:"false"` ExprEvalBatchSize ParamItem `refreshable:"false"`
// pipeline
CleanExcludeSegInterval ParamItem `refreshable:"false"`
FlowGraphMaxQueueLength ParamItem `refreshable:"false"`
FlowGraphMaxParallelism ParamItem `refreshable:"false"`
} }
func (p *queryNodeConfig) init(base *BaseTable) { func (p *queryNodeConfig) init(base *BaseTable) {
@ -2426,6 +2428,15 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
} }
p.ExprEvalBatchSize.Init(base.mgr) p.ExprEvalBatchSize.Init(base.mgr)
p.CleanExcludeSegInterval = ParamItem{
Key: "queryCoord.cleanExcludeSegmentInterval",
Version: "2.4.0",
DefaultValue: "60",
Doc: "the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds",
Export: true,
}
p.CleanExcludeSegInterval.Init(base.mgr)
} }
// ///////////////////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////////////////