enhance: Exclude L0 segment from readable snapshot (#35507)

L0 segments now do not contain insert data and may cause confusion for
query hook optimizer if counted as sealed segment number.

This PR add segment level flag in segment entry and exclude L0 segments
while get readable segment snaphsot

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/35525/head
congqixia 2024-08-16 15:28:53 +08:00 committed by GitHub
parent 5e9097c2dd
commit f87af9bc54
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 31 additions and 20 deletions

View File

@ -526,6 +526,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
PartitionID: info.GetPartitionID(),
NodeID: req.GetDstNodeID(),
Version: req.GetVersion(),
Level: info.GetLevel(),
}
})
if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 {

View File

@ -509,6 +509,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
Level: datapb.SegmentLevel_L1,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
@ -524,6 +525,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
NodeID: 1,
PartitionID: 500,
TargetVersion: unreadableTargetVersion,
Level: datapb.SegmentLevel_L1,
},
}, sealed[0].Segments)
})
@ -599,20 +601,21 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
})
s.NoError(err)
err = s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: commonpbutil.NewMsgBase(),
DstNodeID: 1,
CollectionID: s.collectionID,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: 200,
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
// err = s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
// Base: commonpbutil.NewMsgBase(),
// DstNodeID: 1,
// CollectionID: s.collectionID,
// Infos: []*querypb.SegmentLoadInfo{
// {
// SegmentID: 200,
// PartitionID: 500,
// StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
// DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
// Level: datapb.SegmentLevel_L1,
// InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
// },
// },
// })
s.NoError(err)
sealed, _ := s.delegator.GetSegmentInfo(false)
@ -624,12 +627,14 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
NodeID: 1,
PartitionID: 500,
TargetVersion: unreadableTargetVersion,
Level: datapb.SegmentLevel_L1,
},
{
SegmentID: 200,
NodeID: 1,
PartitionID: 500,
TargetVersion: unreadableTargetVersion,
Level: datapb.SegmentLevel_L0,
},
}, sealed[0].Segments)
})

View File

@ -23,6 +23,7 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -84,6 +85,7 @@ type SegmentEntry struct {
PartitionID UniqueID
Version int64
TargetVersion int64
Level datapb.SegmentLevel
}
// NewDistribution creates a new distribution instance with all field initialized.
@ -114,9 +116,7 @@ func (d *distribution) PinReadableSegments(partitions ...int64) (sealed []Snapsh
sealed, growing = current.Get(partitions...)
version = current.version
targetVersion := current.GetTargetVersion()
filterReadable := func(entry SegmentEntry, _ int) bool {
return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion
}
filterReadable := d.readableFilter(targetVersion)
sealed, growing = d.filterSegments(sealed, growing, filterReadable)
return
}
@ -157,9 +157,7 @@ func (d *distribution) PeekSegments(readable bool, partitions ...int64) (sealed
if readable {
targetVersion := current.GetTargetVersion()
filterReadable := func(entry SegmentEntry, _ int) bool {
return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion
}
filterReadable := d.readableFilter(targetVersion)
sealed, growing = d.filterSegments(sealed, growing, filterReadable)
return
}
@ -382,6 +380,13 @@ func (d *distribution) genSnapshot() chan struct{} {
return last.cleared
}
func (d *distribution) readableFilter(targetVersion int64) func(entry SegmentEntry, _ int) bool {
return func(entry SegmentEntry, _ int) bool {
// segment L0 is not readable for now
return entry.Level != datapb.SegmentLevel_L0 && (entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion)
}
}
// getCleanup returns cleanup snapshots function.
func (d *distribution) getCleanup(version int64) snapshotCleanup {
return func() {