From a8b7629315d39d4d3509b5880f22b0d753161e5a Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 28 Dec 2023 18:18:54 +0800 Subject: [PATCH] fix: exclude insertData before growing checkpoint (#29558) Resolves: #29556 Refine exclude segment function signature Add exclude growing before checkpoint logic Signed-off-by: Congqi Xia --- internal/querynodev2/pipeline/filter_node.go | 5 +- .../querynodev2/pipeline/filter_node_test.go | 12 ++--- .../querynodev2/pipeline/filter_policy.go | 4 +- internal/querynodev2/pipeline/pipeline.go | 17 +++--- internal/querynodev2/services.go | 52 +++++++------------ 5 files changed, 34 insertions(+), 56 deletions(-) diff --git a/internal/querynodev2/pipeline/filter_node.go b/internal/querynodev2/pipeline/filter_node.go index 8e4205cb66..42e02c95be 100644 --- a/internal/querynodev2/pipeline/filter_node.go +++ b/internal/querynodev2/pipeline/filter_node.go @@ -23,7 +23,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus/internal/proto/datapb" base "github.com/milvus-io/milvus/internal/util/pipeline" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -39,7 +38,7 @@ type filterNode struct { *BaseNode collectionID UniqueID manager *DataManager - excludedSegments *typeutil.ConcurrentMap[int64, *datapb.SegmentInfo] + excludedSegments *typeutil.ConcurrentMap[int64, uint64] channel string InsertMsgPolicys []InsertMsgFilter DeleteMsgPolicys []DeleteMsgFilter @@ -134,7 +133,7 @@ func newFilterNode( collectionID int64, channel string, manager *DataManager, - excludedSegments *typeutil.ConcurrentMap[int64, *datapb.SegmentInfo], + excludedSegments *typeutil.ConcurrentMap[int64, uint64], maxQueueLength int32, ) *filterNode { return &filterNode{ diff --git a/internal/querynodev2/pipeline/filter_node_test.go b/internal/querynodev2/pipeline/filter_node_test.go index 8d5eda9f33..07df419c2d 100644 --- a/internal/querynodev2/pipeline/filter_node_test.go +++ b/internal/querynodev2/pipeline/filter_node_test.go @@ -22,8 +22,6 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/suite" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -40,7 +38,7 @@ type FilterNodeSuite struct { channel string validSegmentIDs []int64 - excludedSegments *typeutil.ConcurrentMap[int64, *datapb.SegmentInfo] + excludedSegments *typeutil.ConcurrentMap[int64, uint64] excludedSegmentIDs []int64 insertSegmentIDs []int64 deleteSegmentSum int @@ -64,13 +62,9 @@ func (suite *FilterNodeSuite) SetupSuite() { suite.errSegmentID = 7 // init excludedSegment - suite.excludedSegments = typeutil.NewConcurrentMap[int64, *datapb.SegmentInfo]() + suite.excludedSegments = typeutil.NewConcurrentMap[int64, uint64]() for _, id := range suite.excludedSegmentIDs { - suite.excludedSegments.Insert(id, &datapb.SegmentInfo{ - DmlPosition: &msgpb.MsgPosition{ - Timestamp: 1, - }, - }) + suite.excludedSegments.Insert(id, 1) } } diff --git a/internal/querynodev2/pipeline/filter_policy.go b/internal/querynodev2/pipeline/filter_policy.go index fb8f62a39f..3e51bb7577 100644 --- a/internal/querynodev2/pipeline/filter_policy.go +++ b/internal/querynodev2/pipeline/filter_policy.go @@ -54,11 +54,11 @@ func InsertOutOfTarget(n *filterNode, c *Collection, msg *InsertMsg) error { } func InsertExcluded(n *filterNode, c *Collection, msg *InsertMsg) error { - segInfo, ok := n.excludedSegments.Get(msg.SegmentID) + ts, ok := n.excludedSegments.Get(msg.SegmentID) if !ok { return nil } - if msg.EndTimestamp <= segInfo.GetDmlPosition().GetTimestamp() { + if msg.EndTimestamp <= ts { m := fmt.Sprintf("Segment excluded, id: %d", msg.GetSegmentID()) return merr.WrapErrSegmentLack(msg.GetSegmentID(), m) } diff --git a/internal/querynodev2/pipeline/pipeline.go b/internal/querynodev2/pipeline/pipeline.go index ffcb0fbc92..f5332d4d35 100644 --- a/internal/querynodev2/pipeline/pipeline.go +++ b/internal/querynodev2/pipeline/pipeline.go @@ -19,7 +19,6 @@ package pipeline import ( "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/querynodev2/delegator" base "github.com/milvus-io/milvus/internal/util/pipeline" "github.com/milvus-io/milvus/pkg/log" @@ -32,23 +31,23 @@ import ( // pipeline used for querynode type Pipeline interface { base.StreamPipeline - ExcludedSegments(segInfos ...*datapb.SegmentInfo) + ExcludedSegments(info map[int64]uint64) } type pipeline struct { base.StreamPipeline - excludedSegments *typeutil.ConcurrentMap[int64, *datapb.SegmentInfo] + excludedSegments *typeutil.ConcurrentMap[int64, uint64] collectionID UniqueID } -func (p *pipeline) ExcludedSegments(segInfos ...*datapb.SegmentInfo) { - for _, segInfo := range segInfos { +func (p *pipeline) ExcludedSegments(excludeInfo map[int64]uint64) { //(segInfos ...*datapb.SegmentInfo) { + for segmentID, ts := range excludeInfo { log.Debug("pipeline add exclude info", - zap.Int64("segmentID", segInfo.GetID()), - zap.Uint64("ts", segInfo.GetDmlPosition().GetTimestamp()), + zap.Int64("segmentID", segmentID), + zap.Uint64("ts", ts), ) - p.excludedSegments.Insert(segInfo.GetID(), segInfo) + p.excludedSegments.Insert(segmentID, ts) } } @@ -66,7 +65,7 @@ func NewPipeLine( delegator delegator.ShardDelegator, ) (Pipeline, error) { pipelineQueueLength := paramtable.Get().QueryNodeCfg.FlowGraphMaxQueueLength.GetAsInt32() - excludedSegments := typeutil.NewConcurrentMap[int64, *datapb.SegmentInfo]() + excludedSegments := typeutil.NewConcurrentMap[int64, uint64]() p := &pipeline{ collectionID: collectionID, diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 099494432e..ecdb18b13a 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -32,7 +32,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -301,25 +300,21 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm } }() - flushedSegments := lo.Map(channel.GetFlushedSegmentIds(), func(id int64, _ int) *datapb.SegmentInfo { - return &datapb.SegmentInfo{ - ID: id, - DmlPosition: &msgpb.MsgPosition{ - Timestamp: typeutil.MaxTimestamp, - }, - } + growingInfo := lo.SliceToMap(channel.GetUnflushedSegmentIds(), func(id int64) (int64, uint64) { + info := req.GetSegmentInfos()[id] + return id, info.GetDmlPosition().GetTimestamp() }) - pipeline.ExcludedSegments(flushedSegments...) + pipeline.ExcludedSegments(growingInfo) + + flushedInfo := lo.SliceToMap(channel.GetFlushedSegmentIds(), func(id int64) (int64, uint64) { + return id, typeutil.MaxTimestamp + }) + pipeline.ExcludedSegments(flushedInfo) for _, channelInfo := range req.GetInfos() { - droppedInfos := lo.Map(channelInfo.GetDroppedSegmentIds(), func(id int64, _ int) *datapb.SegmentInfo { - return &datapb.SegmentInfo{ - ID: id, - DmlPosition: &msgpb.MsgPosition{ - Timestamp: typeutil.MaxTimestamp, - }, - } + droppedInfos := lo.SliceToMap(channelInfo.GetDroppedSegmentIds(), func(id int64) (int64, uint64) { + return id, typeutil.MaxTimestamp }) - pipeline.ExcludedSegments(droppedInfos...) + pipeline.ExcludedSegments(droppedInfos) } err = loadL0Segments(ctx, delegator, req) @@ -574,15 +569,10 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release // in case of consumed it's growing segment again pipeline := node.pipelineManager.Get(req.GetShard()) if pipeline != nil { - droppedInfos := lo.Map(req.GetSegmentIDs(), func(id int64, _ int) *datapb.SegmentInfo { - return &datapb.SegmentInfo{ - ID: id, - DmlPosition: &msgpb.MsgPosition{ - Timestamp: typeutil.MaxTimestamp, - }, - } + droppedInfos := lo.SliceToMap(req.GetSegmentIDs(), func(id int64) (int64, uint64) { + return id, typeutil.MaxTimestamp }) - pipeline.ExcludedSegments(droppedInfos...) + pipeline.ExcludedSegments(droppedInfos) } req.NeedTransfer = false @@ -1380,15 +1370,11 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion())) pipeline := node.pipelineManager.Get(req.GetChannel()) if pipeline != nil { - droppedInfos := lo.Map(action.GetDroppedInTarget(), func(id int64, _ int) *datapb.SegmentInfo { - return &datapb.SegmentInfo{ - ID: id, - DmlPosition: &msgpb.MsgPosition{ - Timestamp: typeutil.MaxTimestamp, - }, - } + droppedInfos := lo.SliceToMap(action.GetDroppedInTarget(), func(id int64) (int64, uint64) { + return id, typeutil.MaxTimestamp }) - pipeline.ExcludedSegments(droppedInfos...) + + pipeline.ExcludedSegments(droppedInfos) } shardDelegator.SyncTargetVersion(action.GetTargetVersion(), action.GetGrowingInTarget(), action.GetSealedInTarget(), action.GetDroppedInTarget())