mirror of https://github.com/milvus-io/milvus.git
enhance: Refine compaction interfaces to support major compaction (#30632)
Refine compaction interfaces in datacoord, support compaction result with more than one segment. Prepare for major compaction. related: #30633 Signed-off-by: wayblink <anyang.wang@zilliz.com>pull/30473/head
parent
75a1610f0b
commit
b74264881c
|
@ -80,7 +80,7 @@ type CompactionMeta interface {
|
|||
UpdateSegmentsInfo(operators ...UpdateOperator) error
|
||||
SetSegmentCompacting(segmentID int64, compacting bool)
|
||||
|
||||
CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) (*SegmentInfo, *segMetricMutation, error)
|
||||
CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)
|
||||
}
|
||||
|
||||
var _ CompactionMeta = (*meta)(nil)
|
||||
|
@ -448,13 +448,13 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
|
|||
log.Info("meta has already been changed, skip meta change and retry sync segments")
|
||||
} else {
|
||||
// Also prepare metric updates.
|
||||
newSegment, metricMutation, err := c.meta.CompleteCompactionMutation(plan, result)
|
||||
newSegments, metricMutation, err := c.meta.CompleteCompactionMutation(plan, result)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Apply metrics after successful meta update.
|
||||
metricMutation.commit()
|
||||
newSegmentInfo = newSegment
|
||||
newSegmentInfo = newSegments[0]
|
||||
}
|
||||
|
||||
nodeID := c.plans[plan.GetPlanID()].dataNodeID
|
||||
|
|
|
@ -362,8 +362,9 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
|
|||
s.Run("sync segment error", func() {
|
||||
s.SetupTest()
|
||||
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once()
|
||||
segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100})
|
||||
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(
|
||||
NewSegmentInfo(&datapb.SegmentInfo{ID: 100}),
|
||||
[]*SegmentInfo{segment},
|
||||
&segMetricMutation{}, nil).Once()
|
||||
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()
|
||||
|
||||
|
@ -400,8 +401,9 @@ func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() {
|
|||
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
// mock for handleMergeCompactionResult
|
||||
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once()
|
||||
segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100})
|
||||
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(
|
||||
NewSegmentInfo(&datapb.SegmentInfo{ID: 100}),
|
||||
[]*SegmentInfo{segment},
|
||||
&segMetricMutation{}, nil).Once()
|
||||
s.mockSch.EXPECT().Finish(mock.Anything, mock.Anything).Return()
|
||||
|
||||
|
|
|
@ -981,20 +981,21 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
|
|||
// CompleteCompactionMutation completes compaction mutation.
|
||||
func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan,
|
||||
result *datapb.CompactionPlanResult,
|
||||
) (*SegmentInfo, *segMetricMutation, error) {
|
||||
) ([]*SegmentInfo, *segMetricMutation, error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
modSegments, segment, metricMutation, err := m.prepareCompactionMutation(plan, result)
|
||||
modSegments, segments, metricMutation, err := m.prepareCompactionMutation(plan, result)
|
||||
if err != nil {
|
||||
log.Warn("fail to prepare for complete compaction mutation", zap.Error(err), zap.Int64("planID", plan.GetPlanID()))
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if err := m.alterMetaStoreAfterCompaction(segment, modSegments); err != nil {
|
||||
log.Warn("fail to alert meta store", zap.Error(err), zap.Int64("segmentID", segment.GetID()), zap.Int64("planID", plan.GetPlanID()))
|
||||
if err := m.alterMetaStoreAfterCompaction(segments, modSegments); err != nil {
|
||||
newSegIDs := lo.Map(segments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })
|
||||
log.Warn("fail to alert meta store", zap.Error(err), zap.Int64s("segmentIDs", newSegIDs), zap.Int64("planID", plan.GetPlanID()))
|
||||
return nil, nil, err
|
||||
}
|
||||
return segment, metricMutation, err
|
||||
return segments, metricMutation, err
|
||||
}
|
||||
|
||||
// prepareCompactionMutation returns
|
||||
|
@ -1002,9 +1003,10 @@ func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan,
|
|||
// - the segment info of compactedTo segment after compaction to add
|
||||
// The compactedTo segment could contain 0 numRows
|
||||
// TODO: too complicated
|
||||
// TODO: support Major compaction
|
||||
func (m *meta) prepareCompactionMutation(plan *datapb.CompactionPlan,
|
||||
result *datapb.CompactionPlanResult,
|
||||
) ([]*SegmentInfo, *SegmentInfo, *segMetricMutation, error) {
|
||||
) ([]*SegmentInfo, []*SegmentInfo, *segMetricMutation, error) {
|
||||
log.Info("meta update: prepare for complete compaction mutation")
|
||||
compactionLogs := plan.GetSegmentBinlogs()
|
||||
|
||||
|
@ -1100,7 +1102,7 @@ func (m *meta) prepareCompactionMutation(plan *datapb.CompactionPlan,
|
|||
zap.Int64("new segment num of rows", segment.GetNumOfRows()),
|
||||
zap.Any("compacted from", segment.GetCompactionFrom()))
|
||||
|
||||
return modSegments, segment, metricMutation, nil
|
||||
return modSegments, []*SegmentInfo{segment}, metricMutation, nil
|
||||
}
|
||||
|
||||
func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, partitionID, targetSegmentID int64) ([]*datapb.FieldBinlog, error) {
|
||||
|
@ -1124,46 +1126,47 @@ func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, parti
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func (m *meta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segmentsCompactFrom []*SegmentInfo) error {
|
||||
func (m *meta) alterMetaStoreAfterCompaction(segmentsCompactTo []*SegmentInfo, segmentsCompactFrom []*SegmentInfo) error {
|
||||
modInfos := make([]*datapb.SegmentInfo, 0, len(segmentsCompactFrom))
|
||||
for _, segment := range segmentsCompactFrom {
|
||||
modInfos = append(modInfos, segment.SegmentInfo)
|
||||
}
|
||||
|
||||
newSegment := segmentCompactTo.SegmentInfo
|
||||
|
||||
modSegIDs := lo.Map(modInfos, func(segment *datapb.SegmentInfo, _ int) int64 { return segment.GetID() })
|
||||
if newSegment.GetNumOfRows() == 0 {
|
||||
newSegment.State = commonpb.SegmentState_Dropped
|
||||
newSegments := make([]*datapb.SegmentInfo, len(segmentsCompactTo))
|
||||
binlogsIncrements := make([]metastore.BinlogsIncrement, len(segmentsCompactTo))
|
||||
for i, seg := range segmentsCompactTo {
|
||||
newSegment := seg.SegmentInfo
|
||||
if newSegment.GetNumOfRows() == 0 {
|
||||
newSegment.State = commonpb.SegmentState_Dropped
|
||||
}
|
||||
newSegments[i] = newSegment
|
||||
binlogsIncrements[i] = metastore.BinlogsIncrement{
|
||||
Segment: newSegment,
|
||||
}
|
||||
}
|
||||
modSegIDs := lo.Map(modInfos, func(segment *datapb.SegmentInfo, _ int) int64 { return segment.GetID() })
|
||||
newSegIDs := lo.Map(newSegments, func(segment *datapb.SegmentInfo, _ int) int64 { return segment.GetID() })
|
||||
|
||||
log.Debug("meta update: alter meta store for compaction updates",
|
||||
zap.Int64s("compact from segments (segments to be updated as dropped)", modSegIDs),
|
||||
zap.Int64("new segmentID", newSegment.GetID()),
|
||||
zap.Int("binlog", len(newSegment.GetBinlogs())),
|
||||
zap.Int("stats log", len(newSegment.GetStatslogs())),
|
||||
zap.Int("delta logs", len(newSegment.GetDeltalogs())),
|
||||
zap.Int64("compact to segment", newSegment.GetID()))
|
||||
zap.Int64s("compact from segment IDs", modSegIDs),
|
||||
zap.Int64s("compact to segment IDs", newSegIDs))
|
||||
|
||||
err := m.catalog.AlterSegments(m.ctx, append(modInfos, newSegment), metastore.BinlogsIncrement{
|
||||
Segment: newSegment,
|
||||
})
|
||||
err := m.catalog.AlterSegments(m.ctx, append(modInfos, newSegments...), binlogsIncrements...)
|
||||
if err != nil {
|
||||
log.Warn("fail to alter segments and new segment", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
var compactFromIDs []int64
|
||||
for _, v := range segmentsCompactFrom {
|
||||
compactFromIDs = append(compactFromIDs, v.GetID())
|
||||
}
|
||||
for _, s := range segmentsCompactFrom {
|
||||
m.segments.SetSegment(s.GetID(), s)
|
||||
}
|
||||
m.segments.SetSegment(segmentCompactTo.GetID(), segmentCompactTo)
|
||||
for _, s := range segmentsCompactTo {
|
||||
m.segments.SetSegment(s.GetID(), s)
|
||||
}
|
||||
|
||||
log.Info("meta update: alter in memory meta after compaction - complete",
|
||||
zap.Int64("compact to segment ID", segmentCompactTo.GetID()),
|
||||
zap.Int64s("compact from segment IDs", compactFromIDs))
|
||||
zap.Int64s("compact from segment IDs", modSegIDs),
|
||||
zap.Int64s("compact to segment IDs", newSegIDs))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -272,17 +272,22 @@ func (suite *MetaBasicSuite) TestPrepareCompleteCompactionMutation() {
|
|||
suite.NotZero(afterCompact[0].GetDroppedAt())
|
||||
suite.NotZero(afterCompact[1].GetDroppedAt())
|
||||
|
||||
suite.Equal(inSegment.SegmentID, newSegment.GetID())
|
||||
suite.Equal(UniqueID(100), newSegment.GetCollectionID())
|
||||
suite.Equal(UniqueID(10), newSegment.GetPartitionID())
|
||||
suite.Equal(inSegment.NumOfRows, newSegment.GetNumOfRows())
|
||||
suite.Equal(commonpb.SegmentState_Flushed, newSegment.GetState())
|
||||
suite.Equal(inSegment.SegmentID, newSegment[0].GetID())
|
||||
suite.Equal(UniqueID(100), newSegment[0].GetCollectionID())
|
||||
suite.Equal(UniqueID(10), newSegment[0].GetPartitionID())
|
||||
suite.Equal(inSegment.NumOfRows, newSegment[0].GetNumOfRows())
|
||||
suite.Equal(commonpb.SegmentState_Flushed, newSegment[0].GetState())
|
||||
|
||||
suite.EqualValues(inSegment.GetInsertLogs(), newSegment.GetBinlogs())
|
||||
suite.EqualValues(inSegment.GetField2StatslogPaths(), newSegment.GetStatslogs())
|
||||
suite.EqualValues(inSegment.GetDeltalogs(), newSegment.GetDeltalogs())
|
||||
suite.NotZero(newSegment.lastFlushTime)
|
||||
suite.Equal(uint64(15), newSegment.GetLastExpireTime())
|
||||
suite.EqualValues(inSegment.GetInsertLogs(), newSegment[0].GetBinlogs())
|
||||
suite.EqualValues(inSegment.GetField2StatslogPaths(), newSegment[0].GetStatslogs())
|
||||
suite.EqualValues(inSegment.GetDeltalogs(), newSegment[0].GetDeltalogs())
|
||||
suite.NotZero(newSegment[0].lastFlushTime)
|
||||
suite.Equal(uint64(15), newSegment[0].GetLastExpireTime())
|
||||
|
||||
segmentsDone, metricMutationDone, err := m.CompleteCompactionMutation(plan, inCompactionResult)
|
||||
suite.NoError(err)
|
||||
suite.NotNil(segmentsDone)
|
||||
suite.NotNil(metricMutationDone)
|
||||
}
|
||||
|
||||
func TestMeta(t *testing.T) {
|
||||
|
@ -773,7 +778,7 @@ func TestMeta_alterMetaStore(t *testing.T) {
|
|||
}},
|
||||
}
|
||||
|
||||
err := m.alterMetaStoreAfterCompaction(&SegmentInfo{SegmentInfo: newSeg}, lo.Map(toAlter, func(t *datapb.SegmentInfo, _ int) *SegmentInfo {
|
||||
err := m.alterMetaStoreAfterCompaction([]*SegmentInfo{{SegmentInfo: newSeg}}, lo.Map(toAlter, func(t *datapb.SegmentInfo, _ int) *SegmentInfo {
|
||||
return &SegmentInfo{SegmentInfo: t}
|
||||
}))
|
||||
assert.NoError(t, err)
|
||||
|
|
|
@ -21,20 +21,20 @@ func (_m *MockCompactionMeta) EXPECT() *MockCompactionMeta_Expecter {
|
|||
}
|
||||
|
||||
// CompleteCompactionMutation provides a mock function with given fields: plan, result
|
||||
func (_m *MockCompactionMeta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) (*SegmentInfo, *segMetricMutation, error) {
|
||||
func (_m *MockCompactionMeta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
|
||||
ret := _m.Called(plan, result)
|
||||
|
||||
var r0 *SegmentInfo
|
||||
var r0 []*SegmentInfo
|
||||
var r1 *segMetricMutation
|
||||
var r2 error
|
||||
if rf, ok := ret.Get(0).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) (*SegmentInfo, *segMetricMutation, error)); ok {
|
||||
if rf, ok := ret.Get(0).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)); ok {
|
||||
return rf(plan, result)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) *SegmentInfo); ok {
|
||||
if rf, ok := ret.Get(0).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) []*SegmentInfo); ok {
|
||||
r0 = rf(plan, result)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*SegmentInfo)
|
||||
r0 = ret.Get(0).([]*SegmentInfo)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,12 +74,12 @@ func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) Run(run func(plan
|
|||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) Return(_a0 *SegmentInfo, _a1 *segMetricMutation, _a2 error) *MockCompactionMeta_CompleteCompactionMutation_Call {
|
||||
func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) Return(_a0 []*SegmentInfo, _a1 *segMetricMutation, _a2 error) *MockCompactionMeta_CompleteCompactionMutation_Call {
|
||||
_c.Call.Return(_a0, _a1, _a2)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) RunAndReturn(run func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) (*SegmentInfo, *segMetricMutation, error)) *MockCompactionMeta_CompleteCompactionMutation_Call {
|
||||
func (_c *MockCompactionMeta_CompleteCompactionMutation_Call) RunAndReturn(run func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)) *MockCompactionMeta_CompleteCompactionMutation_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue