diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 4ee5775486..65a3b926cb 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -815,7 +815,7 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI } binlogs := segment.GetBinlogs() - if len(binlogs) == 0 { + if len(binlogs) == 0 && segment.GetLevel() != datapb.SegmentLevel_L0 { continue } rowCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo) diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index e6772ede82..5485df5e5c 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -206,7 +206,7 @@ func (mgr *TargetManager) PullNextTargetV2(broker Broker, collectionID int64, ch partitionSet := typeutil.NewUniqueSet(chosenPartitionIDs...) for _, segmentInfo := range segmentInfos { - if partitionSet.Contain(segmentInfo.GetPartitionID()) { + if partitionSet.Contain(segmentInfo.GetPartitionID()) && segmentInfo.GetPartitionID() != -1 { segments[segmentInfo.GetID()] = segmentInfo } } diff --git a/internal/querycoordv2/utils/types.go b/internal/querycoordv2/utils/types.go index 7920f10f64..9365ec9f0d 100644 --- a/internal/querycoordv2/utils/types.go +++ b/internal/querycoordv2/utils/types.go @@ -115,6 +115,7 @@ func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, indexes []*querypb IndexInfos: indexes, StartPosition: segment.GetStartPosition(), DeltaPosition: deltaPosition, + Level: segment.GetLevel(), } loadInfo.SegmentSize = calculateSegmentSize(loadInfo) return loadInfo diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index 853b43a467..158d522939 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -346,6 +346,11 @@ func (mgr *segmentManager) GetAndPin(segments []int64, filters ...SegmentFilter) growing, growingExist := mgr.growingSegments[id] sealed, sealedExist := mgr.sealedSegments[id] + // L0 Segment should not be queryable. + if sealedExist && sealed.Level() == datapb.SegmentLevel_L0 { + continue + } + growingExist = growingExist && filter(growing, filters...) sealedExist = sealedExist && filter(sealed, filters...) diff --git a/internal/querynodev2/segments/manager_test.go b/internal/querynodev2/segments/manager_test.go index b5a0180a5a..4e75e71a7d 100644 --- a/internal/querynodev2/segments/manager_test.go +++ b/internal/querynodev2/segments/manager_test.go @@ -3,6 +3,7 @@ package segments import ( "testing" + "github.com/samber/lo" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -22,17 +23,19 @@ type ManagerSuite struct { channels []string types []SegmentType segments []Segment + levels []datapb.SegmentLevel mgr *segmentManager } func (s *ManagerSuite) SetupSuite() { paramtable.Init() - s.segmentIDs = []int64{1, 2, 3} - s.collectionIDs = []int64{100, 200, 300} - s.partitionIDs = []int64{10, 11, 12} - s.channels = []string{"dml1", "dml2", "dml3"} - s.types = []SegmentType{SegmentTypeSealed, SegmentTypeGrowing, SegmentTypeSealed} + s.segmentIDs = []int64{1, 2, 3, 4} + s.collectionIDs = []int64{100, 200, 300, 400} + s.partitionIDs = []int64{10, 11, 12, 13} + s.channels = []string{"dml1", "dml2", "dml3", "dml4"} + s.types = []SegmentType{SegmentTypeSealed, SegmentTypeGrowing, SegmentTypeSealed, SegmentTypeSealed} + s.levels = []datapb.SegmentLevel{datapb.SegmentLevel_Legacy, datapb.SegmentLevel_Legacy, datapb.SegmentLevel_L1, datapb.SegmentLevel_L0} } func (s *ManagerSuite) SetupTest() { @@ -50,7 +53,7 @@ func (s *ManagerSuite) SetupTest() { 0, nil, nil, - datapb.SegmentLevel_Legacy, + s.levels[i], ) s.Require().NoError(err) s.segments = append(s.segments, segment) @@ -62,17 +65,18 @@ func (s *ManagerSuite) SetupTest() { func (s *ManagerSuite) TestGetBy() { for i, partitionID := range s.partitionIDs { segments := s.mgr.GetBy(WithPartition(partitionID)) - s.Contains(segments, s.segments[i]) + s.Contains( + lo.Map(segments, func(segment Segment, _ int) int64 { return segment.ID() }), s.segmentIDs[i]) } for i, channel := range s.channels { segments := s.mgr.GetBy(WithChannel(channel)) - s.Contains(segments, s.segments[i]) + s.Contains(lo.Map(segments, func(segment Segment, _ int) int64 { return segment.ID() }), s.segmentIDs[i]) } for i, typ := range s.types { segments := s.mgr.GetBy(WithType(typ)) - s.Contains(segments, s.segments[i]) + s.Contains(lo.Map(segments, func(segment Segment, _ int) int64 { return segment.ID() }), s.segmentIDs[i]) } s.mgr.Clear() @@ -82,6 +86,13 @@ func (s *ManagerSuite) TestGetBy() { } } +func (s *ManagerSuite) TestGetAndPin() { + // get and pin will ignore L0 segment + segments, err := s.mgr.GetAndPin(lo.Filter(s.segmentIDs, func(_ int64, id int) bool { return s.levels[id] == datapb.SegmentLevel_L0 })) + s.NoError(err) + s.Equal(len(segments), 0) +} + func (s *ManagerSuite) TestRemoveGrowing() { for i, id := range s.segmentIDs { isGrowing := s.types[i] == SegmentTypeGrowing @@ -117,8 +128,8 @@ func (s *ManagerSuite) TestRemoveBy() { func (s *ManagerSuite) TestUpdateBy() { action := IncreaseVersion(1) - s.Equal(2, s.mgr.UpdateBy(action, WithType(SegmentTypeSealed))) - s.Equal(1, s.mgr.UpdateBy(action, WithType(SegmentTypeGrowing))) + s.Equal(lo.Count(s.types, SegmentTypeSealed), s.mgr.UpdateBy(action, WithType(SegmentTypeSealed))) + s.Equal(lo.Count(s.types, SegmentTypeGrowing), s.mgr.UpdateBy(action, WithType(SegmentTypeGrowing))) segments := s.mgr.GetBy() for _, segment := range segments {