mirror of https://github.com/milvus-io/milvus.git
fix: query l0 segment bugs (#28558)
relate: https://github.com/milvus-io/milvus/issues/27675 --------- Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/28576/head
parent
da03feb79a
commit
13a5b9f64a
|
@ -815,7 +815,7 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
|
|||
}
|
||||
|
||||
binlogs := segment.GetBinlogs()
|
||||
if len(binlogs) == 0 {
|
||||
if len(binlogs) == 0 && segment.GetLevel() != datapb.SegmentLevel_L0 {
|
||||
continue
|
||||
}
|
||||
rowCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo)
|
||||
|
|
|
@ -206,7 +206,7 @@ func (mgr *TargetManager) PullNextTargetV2(broker Broker, collectionID int64, ch
|
|||
|
||||
partitionSet := typeutil.NewUniqueSet(chosenPartitionIDs...)
|
||||
for _, segmentInfo := range segmentInfos {
|
||||
if partitionSet.Contain(segmentInfo.GetPartitionID()) {
|
||||
if partitionSet.Contain(segmentInfo.GetPartitionID()) && segmentInfo.GetPartitionID() != -1 {
|
||||
segments[segmentInfo.GetID()] = segmentInfo
|
||||
}
|
||||
}
|
||||
|
|
|
@ -115,6 +115,7 @@ func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, indexes []*querypb
|
|||
IndexInfos: indexes,
|
||||
StartPosition: segment.GetStartPosition(),
|
||||
DeltaPosition: deltaPosition,
|
||||
Level: segment.GetLevel(),
|
||||
}
|
||||
loadInfo.SegmentSize = calculateSegmentSize(loadInfo)
|
||||
return loadInfo
|
||||
|
|
|
@ -346,6 +346,11 @@ func (mgr *segmentManager) GetAndPin(segments []int64, filters ...SegmentFilter)
|
|||
growing, growingExist := mgr.growingSegments[id]
|
||||
sealed, sealedExist := mgr.sealedSegments[id]
|
||||
|
||||
// L0 Segment should not be queryable.
|
||||
if sealedExist && sealed.Level() == datapb.SegmentLevel_L0 {
|
||||
continue
|
||||
}
|
||||
|
||||
growingExist = growingExist && filter(growing, filters...)
|
||||
sealedExist = sealedExist && filter(sealed, filters...)
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package segments
|
|||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
|
@ -22,17 +23,19 @@ type ManagerSuite struct {
|
|||
channels []string
|
||||
types []SegmentType
|
||||
segments []Segment
|
||||
levels []datapb.SegmentLevel
|
||||
|
||||
mgr *segmentManager
|
||||
}
|
||||
|
||||
func (s *ManagerSuite) SetupSuite() {
|
||||
paramtable.Init()
|
||||
s.segmentIDs = []int64{1, 2, 3}
|
||||
s.collectionIDs = []int64{100, 200, 300}
|
||||
s.partitionIDs = []int64{10, 11, 12}
|
||||
s.channels = []string{"dml1", "dml2", "dml3"}
|
||||
s.types = []SegmentType{SegmentTypeSealed, SegmentTypeGrowing, SegmentTypeSealed}
|
||||
s.segmentIDs = []int64{1, 2, 3, 4}
|
||||
s.collectionIDs = []int64{100, 200, 300, 400}
|
||||
s.partitionIDs = []int64{10, 11, 12, 13}
|
||||
s.channels = []string{"dml1", "dml2", "dml3", "dml4"}
|
||||
s.types = []SegmentType{SegmentTypeSealed, SegmentTypeGrowing, SegmentTypeSealed, SegmentTypeSealed}
|
||||
s.levels = []datapb.SegmentLevel{datapb.SegmentLevel_Legacy, datapb.SegmentLevel_Legacy, datapb.SegmentLevel_L1, datapb.SegmentLevel_L0}
|
||||
}
|
||||
|
||||
func (s *ManagerSuite) SetupTest() {
|
||||
|
@ -50,7 +53,7 @@ func (s *ManagerSuite) SetupTest() {
|
|||
0,
|
||||
nil,
|
||||
nil,
|
||||
datapb.SegmentLevel_Legacy,
|
||||
s.levels[i],
|
||||
)
|
||||
s.Require().NoError(err)
|
||||
s.segments = append(s.segments, segment)
|
||||
|
@ -62,17 +65,18 @@ func (s *ManagerSuite) SetupTest() {
|
|||
func (s *ManagerSuite) TestGetBy() {
|
||||
for i, partitionID := range s.partitionIDs {
|
||||
segments := s.mgr.GetBy(WithPartition(partitionID))
|
||||
s.Contains(segments, s.segments[i])
|
||||
s.Contains(
|
||||
lo.Map(segments, func(segment Segment, _ int) int64 { return segment.ID() }), s.segmentIDs[i])
|
||||
}
|
||||
|
||||
for i, channel := range s.channels {
|
||||
segments := s.mgr.GetBy(WithChannel(channel))
|
||||
s.Contains(segments, s.segments[i])
|
||||
s.Contains(lo.Map(segments, func(segment Segment, _ int) int64 { return segment.ID() }), s.segmentIDs[i])
|
||||
}
|
||||
|
||||
for i, typ := range s.types {
|
||||
segments := s.mgr.GetBy(WithType(typ))
|
||||
s.Contains(segments, s.segments[i])
|
||||
s.Contains(lo.Map(segments, func(segment Segment, _ int) int64 { return segment.ID() }), s.segmentIDs[i])
|
||||
}
|
||||
s.mgr.Clear()
|
||||
|
||||
|
@ -82,6 +86,13 @@ func (s *ManagerSuite) TestGetBy() {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *ManagerSuite) TestGetAndPin() {
|
||||
// get and pin will ignore L0 segment
|
||||
segments, err := s.mgr.GetAndPin(lo.Filter(s.segmentIDs, func(_ int64, id int) bool { return s.levels[id] == datapb.SegmentLevel_L0 }))
|
||||
s.NoError(err)
|
||||
s.Equal(len(segments), 0)
|
||||
}
|
||||
|
||||
func (s *ManagerSuite) TestRemoveGrowing() {
|
||||
for i, id := range s.segmentIDs {
|
||||
isGrowing := s.types[i] == SegmentTypeGrowing
|
||||
|
@ -117,8 +128,8 @@ func (s *ManagerSuite) TestRemoveBy() {
|
|||
func (s *ManagerSuite) TestUpdateBy() {
|
||||
action := IncreaseVersion(1)
|
||||
|
||||
s.Equal(2, s.mgr.UpdateBy(action, WithType(SegmentTypeSealed)))
|
||||
s.Equal(1, s.mgr.UpdateBy(action, WithType(SegmentTypeGrowing)))
|
||||
s.Equal(lo.Count(s.types, SegmentTypeSealed), s.mgr.UpdateBy(action, WithType(SegmentTypeSealed)))
|
||||
s.Equal(lo.Count(s.types, SegmentTypeGrowing), s.mgr.UpdateBy(action, WithType(SegmentTypeGrowing)))
|
||||
|
||||
segments := s.mgr.GetBy()
|
||||
for _, segment := range segments {
|
||||
|
|
Loading…
Reference in New Issue