mirror of https://github.com/milvus-io/milvus.git
fix: Fill in info in CompactionSegmentBinlogs (#30279)
After #28873, PartitionID and CollectionID should be filled in CompactionSegmentBinlog so that DataNode can compose the correct logPath. However There're some places left forgotten to fill in the information, causing Datanode downloading `xxx/0/0/xxxx/xxxx` binlogs during compaction See also: #30213 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/30304/head
parent
405877c8cd
commit
0b6beb7e0f
|
@ -316,8 +316,10 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) {
|
||||||
|
|
||||||
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
|
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
|
||||||
return &datapb.CompactionSegmentBinlogs{
|
return &datapb.CompactionSegmentBinlogs{
|
||||||
SegmentID: info.GetID(),
|
SegmentID: info.GetID(),
|
||||||
Level: datapb.SegmentLevel_L1,
|
Level: datapb.SegmentLevel_L1,
|
||||||
|
CollectionID: info.GetCollectionID(),
|
||||||
|
PartitionID: info.GetPartitionID(),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -701,6 +701,9 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, i
|
||||||
FieldBinlogs: s.GetBinlogs(),
|
FieldBinlogs: s.GetBinlogs(),
|
||||||
Field2StatslogPaths: s.GetStatslogs(),
|
Field2StatslogPaths: s.GetStatslogs(),
|
||||||
Deltalogs: s.GetDeltalogs(),
|
Deltalogs: s.GetDeltalogs(),
|
||||||
|
Level: s.GetLevel(),
|
||||||
|
CollectionID: s.GetCollectionID(),
|
||||||
|
PartitionID: s.GetPartitionID(),
|
||||||
}
|
}
|
||||||
plan.TotalRows += s.GetNumOfRows()
|
plan.TotalRows += s.GetNumOfRows()
|
||||||
plan.SegmentBinlogs = append(plan.SegmentBinlogs, segmentBinLogs)
|
plan.SegmentBinlogs = append(plan.SegmentBinlogs, segmentBinLogs)
|
||||||
|
|
|
@ -89,14 +89,14 @@ func (m *CompactionTriggerManager) Notify(taskID UniqueID, eventType CompactionT
|
||||||
|
|
||||||
func (m *CompactionTriggerManager) BuildLevelZeroCompactionPlan(view CompactionView) *datapb.CompactionPlan {
|
func (m *CompactionTriggerManager) BuildLevelZeroCompactionPlan(view CompactionView) *datapb.CompactionPlan {
|
||||||
var segmentBinlogs []*datapb.CompactionSegmentBinlogs
|
var segmentBinlogs []*datapb.CompactionSegmentBinlogs
|
||||||
levelZeroSegs := lo.Map(view.GetSegmentsView(), func(v *SegmentView, _ int) *datapb.CompactionSegmentBinlogs {
|
levelZeroSegs := lo.Map(view.GetSegmentsView(), func(segView *SegmentView, _ int) *datapb.CompactionSegmentBinlogs {
|
||||||
s := m.meta.GetSegment(v.ID)
|
s := m.meta.GetSegment(segView.ID)
|
||||||
return &datapb.CompactionSegmentBinlogs{
|
return &datapb.CompactionSegmentBinlogs{
|
||||||
SegmentID: s.GetID(),
|
SegmentID: segView.ID,
|
||||||
Deltalogs: s.GetDeltalogs(),
|
Deltalogs: s.GetDeltalogs(),
|
||||||
Level: datapb.SegmentLevel_L0,
|
Level: datapb.SegmentLevel_L0,
|
||||||
CollectionID: s.GetCollectionID(),
|
CollectionID: view.GetGroupLabel().CollectionID,
|
||||||
PartitionID: s.GetPartitionID(),
|
PartitionID: view.GetGroupLabel().PartitionID,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
segmentBinlogs = append(segmentBinlogs, levelZeroSegs...)
|
segmentBinlogs = append(segmentBinlogs, levelZeroSegs...)
|
||||||
|
|
Loading…
Reference in New Issue