mirror of https://github.com/milvus-io/milvus.git
fix: level 0 segments not loaded (#29908)
the recent changes move the level 0 segments list to a new proto field, which leads to the QueryCoord can't see the level 0 segments, handle the new changes fix #29907 Signed-off-by: yah01 <yang.cen@zilliz.com>pull/30020/head
parent
2dacca7b84
commit
c68c128e47
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
|
@ -205,6 +206,15 @@ func (mgr *TargetManager) PullNextTargetV2(broker Broker, collectionID int64, ch
|
|||
|
||||
for _, info := range vChannelInfos {
|
||||
channelInfos[info.GetChannelName()] = append(channelInfos[info.GetChannelName()], info)
|
||||
for _, segmentID := range info.GetLevelZeroSegmentIds() {
|
||||
segments[segmentID] = &datapb.SegmentInfo{
|
||||
ID: segmentID,
|
||||
CollectionID: collectionID,
|
||||
InsertChannel: info.GetChannelName(),
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
partitionSet := typeutil.NewUniqueSet(chosenPartitionIDs...)
|
||||
|
|
|
@ -44,10 +44,11 @@ type TargetManagerSuite struct {
|
|||
suite.Suite
|
||||
|
||||
// Data
|
||||
collections []int64
|
||||
partitions map[int64][]int64
|
||||
channels map[int64][]string
|
||||
segments map[int64]map[int64][]int64 // CollectionID, PartitionID -> Segments
|
||||
collections []int64
|
||||
partitions map[int64][]int64
|
||||
channels map[int64][]string
|
||||
segments map[int64]map[int64][]int64 // CollectionID, PartitionID -> Segments
|
||||
level0Segments []int64
|
||||
// Derived data
|
||||
allChannels []string
|
||||
allSegments []int64
|
||||
|
@ -80,6 +81,7 @@ func (suite *TargetManagerSuite) SetupSuite() {
|
|||
103: {7, 8},
|
||||
},
|
||||
}
|
||||
suite.level0Segments = []int64{10000, 10001}
|
||||
|
||||
suite.allChannels = make([]string, 0)
|
||||
suite.allSegments = make([]int64, 0)
|
||||
|
@ -118,8 +120,9 @@ func (suite *TargetManagerSuite) SetupTest() {
|
|||
dmChannels := make([]*datapb.VchannelInfo, 0)
|
||||
for _, channel := range suite.channels[collection] {
|
||||
dmChannels = append(dmChannels, &datapb.VchannelInfo{
|
||||
CollectionID: collection,
|
||||
ChannelName: channel,
|
||||
CollectionID: collection,
|
||||
ChannelName: channel,
|
||||
LevelZeroSegmentIds: suite.level0Segments,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -265,7 +268,7 @@ func (suite *TargetManagerSuite) TestRemovePartition() {
|
|||
suite.assertChannels([]string{}, suite.mgr.GetDmChannelsByCollection(collectionID, CurrentTarget))
|
||||
|
||||
suite.mgr.RemovePartition(collectionID, 100)
|
||||
suite.assertSegments([]int64{3, 4}, suite.mgr.GetSealedSegmentsByCollection(collectionID, NextTarget))
|
||||
suite.assertSegments(append([]int64{3, 4}, suite.level0Segments...), suite.mgr.GetSealedSegmentsByCollection(collectionID, NextTarget))
|
||||
suite.assertChannels(suite.channels[collectionID], suite.mgr.GetDmChannelsByCollection(collectionID, NextTarget))
|
||||
suite.assertSegments([]int64{}, suite.mgr.GetSealedSegmentsByCollection(collectionID, CurrentTarget))
|
||||
suite.assertChannels([]string{}, suite.mgr.GetDmChannelsByCollection(collectionID, CurrentTarget))
|
||||
|
@ -310,7 +313,7 @@ func (suite *TargetManagerSuite) getAllSegment(collectionID int64, partitionIDs
|
|||
}
|
||||
}
|
||||
|
||||
return allSegments
|
||||
return append(allSegments, suite.level0Segments...)
|
||||
}
|
||||
|
||||
func (suite *TargetManagerSuite) assertChannels(expected []string, actual map[string]*DmChannel) bool {
|
||||
|
|
|
@ -190,6 +190,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
|
|||
return err
|
||||
}
|
||||
segment := resp.GetInfos()[0]
|
||||
log = log.With(zap.String("level", segment.GetLevel().String()))
|
||||
|
||||
indexes, err := ex.broker.GetIndexInfo(ctx, task.CollectionID(), segment.GetID())
|
||||
if err != nil {
|
||||
|
@ -224,7 +225,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
|
|||
segmentIndex.IndexParams = funcutil.Map2KeyValuePair(params)
|
||||
}
|
||||
|
||||
loadInfo := utils.PackSegmentLoadInfo(resp.GetInfos()[0], channel.GetSeekPosition(), indexes)
|
||||
loadInfo := utils.PackSegmentLoadInfo(segment, channel.GetSeekPosition(), indexes)
|
||||
|
||||
req := packLoadSegmentRequest(
|
||||
task,
|
||||
|
|
Loading…
Reference in New Issue