fix: exclude insertData before growing checkpoint (#29558)

Resolves: #29556
Refine exclude segment function signature
Add exclude growing before checkpoint logic

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/29574/head
congqixia 2023-12-28 18:18:54 +08:00 committed by GitHub
parent ed644983e2
commit a8b7629315
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 34 additions and 56 deletions

View File

@ -23,7 +23,6 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
base "github.com/milvus-io/milvus/internal/util/pipeline"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
@ -39,7 +38,7 @@ type filterNode struct {
*BaseNode
collectionID UniqueID
manager *DataManager
excludedSegments *typeutil.ConcurrentMap[int64, *datapb.SegmentInfo]
excludedSegments *typeutil.ConcurrentMap[int64, uint64]
channel string
InsertMsgPolicys []InsertMsgFilter
DeleteMsgPolicys []DeleteMsgFilter
@ -134,7 +133,7 @@ func newFilterNode(
collectionID int64,
channel string,
manager *DataManager,
excludedSegments *typeutil.ConcurrentMap[int64, *datapb.SegmentInfo],
excludedSegments *typeutil.ConcurrentMap[int64, uint64],
maxQueueLength int32,
) *filterNode {
return &filterNode{

View File

@ -22,8 +22,6 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
@ -40,7 +38,7 @@ type FilterNodeSuite struct {
channel string
validSegmentIDs []int64
excludedSegments *typeutil.ConcurrentMap[int64, *datapb.SegmentInfo]
excludedSegments *typeutil.ConcurrentMap[int64, uint64]
excludedSegmentIDs []int64
insertSegmentIDs []int64
deleteSegmentSum int
@ -64,13 +62,9 @@ func (suite *FilterNodeSuite) SetupSuite() {
suite.errSegmentID = 7
// init excludedSegment
suite.excludedSegments = typeutil.NewConcurrentMap[int64, *datapb.SegmentInfo]()
suite.excludedSegments = typeutil.NewConcurrentMap[int64, uint64]()
for _, id := range suite.excludedSegmentIDs {
suite.excludedSegments.Insert(id, &datapb.SegmentInfo{
DmlPosition: &msgpb.MsgPosition{
Timestamp: 1,
},
})
suite.excludedSegments.Insert(id, 1)
}
}

View File

@ -54,11 +54,11 @@ func InsertOutOfTarget(n *filterNode, c *Collection, msg *InsertMsg) error {
}
func InsertExcluded(n *filterNode, c *Collection, msg *InsertMsg) error {
segInfo, ok := n.excludedSegments.Get(msg.SegmentID)
ts, ok := n.excludedSegments.Get(msg.SegmentID)
if !ok {
return nil
}
if msg.EndTimestamp <= segInfo.GetDmlPosition().GetTimestamp() {
if msg.EndTimestamp <= ts {
m := fmt.Sprintf("Segment excluded, id: %d", msg.GetSegmentID())
return merr.WrapErrSegmentLack(msg.GetSegmentID(), m)
}

View File

@ -19,7 +19,6 @@ package pipeline
import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
base "github.com/milvus-io/milvus/internal/util/pipeline"
"github.com/milvus-io/milvus/pkg/log"
@ -32,23 +31,23 @@ import (
// pipeline used for querynode
type Pipeline interface {
base.StreamPipeline
ExcludedSegments(segInfos ...*datapb.SegmentInfo)
ExcludedSegments(info map[int64]uint64)
}
type pipeline struct {
base.StreamPipeline
excludedSegments *typeutil.ConcurrentMap[int64, *datapb.SegmentInfo]
excludedSegments *typeutil.ConcurrentMap[int64, uint64]
collectionID UniqueID
}
func (p *pipeline) ExcludedSegments(segInfos ...*datapb.SegmentInfo) {
for _, segInfo := range segInfos {
func (p *pipeline) ExcludedSegments(excludeInfo map[int64]uint64) { //(segInfos ...*datapb.SegmentInfo) {
for segmentID, ts := range excludeInfo {
log.Debug("pipeline add exclude info",
zap.Int64("segmentID", segInfo.GetID()),
zap.Uint64("ts", segInfo.GetDmlPosition().GetTimestamp()),
zap.Int64("segmentID", segmentID),
zap.Uint64("ts", ts),
)
p.excludedSegments.Insert(segInfo.GetID(), segInfo)
p.excludedSegments.Insert(segmentID, ts)
}
}
@ -66,7 +65,7 @@ func NewPipeLine(
delegator delegator.ShardDelegator,
) (Pipeline, error) {
pipelineQueueLength := paramtable.Get().QueryNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()
excludedSegments := typeutil.NewConcurrentMap[int64, *datapb.SegmentInfo]()
excludedSegments := typeutil.NewConcurrentMap[int64, uint64]()
p := &pipeline{
collectionID: collectionID,

View File

@ -32,7 +32,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -301,25 +300,21 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
}
}()
flushedSegments := lo.Map(channel.GetFlushedSegmentIds(), func(id int64, _ int) *datapb.SegmentInfo {
return &datapb.SegmentInfo{
ID: id,
DmlPosition: &msgpb.MsgPosition{
Timestamp: typeutil.MaxTimestamp,
},
}
growingInfo := lo.SliceToMap(channel.GetUnflushedSegmentIds(), func(id int64) (int64, uint64) {
info := req.GetSegmentInfos()[id]
return id, info.GetDmlPosition().GetTimestamp()
})
pipeline.ExcludedSegments(flushedSegments...)
pipeline.ExcludedSegments(growingInfo)
flushedInfo := lo.SliceToMap(channel.GetFlushedSegmentIds(), func(id int64) (int64, uint64) {
return id, typeutil.MaxTimestamp
})
pipeline.ExcludedSegments(flushedInfo)
for _, channelInfo := range req.GetInfos() {
droppedInfos := lo.Map(channelInfo.GetDroppedSegmentIds(), func(id int64, _ int) *datapb.SegmentInfo {
return &datapb.SegmentInfo{
ID: id,
DmlPosition: &msgpb.MsgPosition{
Timestamp: typeutil.MaxTimestamp,
},
}
droppedInfos := lo.SliceToMap(channelInfo.GetDroppedSegmentIds(), func(id int64) (int64, uint64) {
return id, typeutil.MaxTimestamp
})
pipeline.ExcludedSegments(droppedInfos...)
pipeline.ExcludedSegments(droppedInfos)
}
err = loadL0Segments(ctx, delegator, req)
@ -574,15 +569,10 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release
// in case of consumed it's growing segment again
pipeline := node.pipelineManager.Get(req.GetShard())
if pipeline != nil {
droppedInfos := lo.Map(req.GetSegmentIDs(), func(id int64, _ int) *datapb.SegmentInfo {
return &datapb.SegmentInfo{
ID: id,
DmlPosition: &msgpb.MsgPosition{
Timestamp: typeutil.MaxTimestamp,
},
}
droppedInfos := lo.SliceToMap(req.GetSegmentIDs(), func(id int64) (int64, uint64) {
return id, typeutil.MaxTimestamp
})
pipeline.ExcludedSegments(droppedInfos...)
pipeline.ExcludedSegments(droppedInfos)
}
req.NeedTransfer = false
@ -1380,15 +1370,11 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion()))
pipeline := node.pipelineManager.Get(req.GetChannel())
if pipeline != nil {
droppedInfos := lo.Map(action.GetDroppedInTarget(), func(id int64, _ int) *datapb.SegmentInfo {
return &datapb.SegmentInfo{
ID: id,
DmlPosition: &msgpb.MsgPosition{
Timestamp: typeutil.MaxTimestamp,
},
}
droppedInfos := lo.SliceToMap(action.GetDroppedInTarget(), func(id int64) (int64, uint64) {
return id, typeutil.MaxTimestamp
})
pipeline.ExcludedSegments(droppedInfos...)
pipeline.ExcludedSegments(droppedInfos)
}
shardDelegator.SyncTargetVersion(action.GetTargetVersion(), action.GetGrowingInTarget(),
action.GetSealedInTarget(), action.GetDroppedInTarget())