fix: Remove L0 compactor in completedCompactor (#33169)

See also: #33168

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/32810/head
XuanYang-cn 2024-05-21 11:35:38 +08:00 committed by GitHub
parent f8929cc36a
commit b3bcc107bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 59 additions and 16 deletions

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.30.1. DO NOT EDIT.
// Code generated by mockery v2.32.4. DO NOT EDIT.
package broker
@ -63,8 +63,8 @@ type MockBroker_AssignSegmentID_Call struct {
}
// AssignSegmentID is a helper method to define mock.On call
// - ctx context.Context
// - reqs ...*datapb.SegmentIDRequest
// - ctx context.Context
// - reqs ...*datapb.SegmentIDRequest
func (_e *MockBroker_Expecter) AssignSegmentID(ctx interface{}, reqs ...interface{}) *MockBroker_AssignSegmentID_Call {
return &MockBroker_AssignSegmentID_Call{Call: _e.mock.On("AssignSegmentID",
append([]interface{}{ctx}, reqs...)...)}
@ -125,8 +125,8 @@ type MockBroker_DropVirtualChannel_Call struct {
}
// DropVirtualChannel is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.DropVirtualChannelRequest
// - ctx context.Context
// - req *datapb.DropVirtualChannelRequest
func (_e *MockBroker_Expecter) DropVirtualChannel(ctx interface{}, req interface{}) *MockBroker_DropVirtualChannel_Call {
return &MockBroker_DropVirtualChannel_Call{Call: _e.mock.On("DropVirtualChannel", ctx, req)}
}
@ -180,8 +180,8 @@ type MockBroker_GetSegmentInfo_Call struct {
}
// GetSegmentInfo is a helper method to define mock.On call
// - ctx context.Context
// - segmentIDs []int64
// - ctx context.Context
// - segmentIDs []int64
func (_e *MockBroker_Expecter) GetSegmentInfo(ctx interface{}, segmentIDs interface{}) *MockBroker_GetSegmentInfo_Call {
return &MockBroker_GetSegmentInfo_Call{Call: _e.mock.On("GetSegmentInfo", ctx, segmentIDs)}
}
@ -223,8 +223,8 @@ type MockBroker_ReportTimeTick_Call struct {
}
// ReportTimeTick is a helper method to define mock.On call
// - ctx context.Context
// - msgs []*msgpb.DataNodeTtMsg
// - ctx context.Context
// - msgs []*msgpb.DataNodeTtMsg
func (_e *MockBroker_Expecter) ReportTimeTick(ctx interface{}, msgs interface{}) *MockBroker_ReportTimeTick_Call {
return &MockBroker_ReportTimeTick_Call{Call: _e.mock.On("ReportTimeTick", ctx, msgs)}
}
@ -266,8 +266,8 @@ type MockBroker_SaveBinlogPaths_Call struct {
}
// SaveBinlogPaths is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.SaveBinlogPathsRequest
// - ctx context.Context
// - req *datapb.SaveBinlogPathsRequest
func (_e *MockBroker_Expecter) SaveBinlogPaths(ctx interface{}, req interface{}) *MockBroker_SaveBinlogPaths_Call {
return &MockBroker_SaveBinlogPaths_Call{Call: _e.mock.On("SaveBinlogPaths", ctx, req)}
}
@ -309,8 +309,8 @@ type MockBroker_UpdateChannelCheckpoint_Call struct {
}
// UpdateChannelCheckpoint is a helper method to define mock.On call
// - ctx context.Context
// - channelCPs []*msgpb.MsgPosition
// - ctx context.Context
// - channelCPs []*msgpb.MsgPosition
func (_e *MockBroker_Expecter) UpdateChannelCheckpoint(ctx interface{}, channelCPs interface{}) *MockBroker_UpdateChannelCheckpoint_Call {
return &MockBroker_UpdateChannelCheckpoint_Call{Call: _e.mock.On("UpdateChannelCheckpoint", ctx, channelCPs)}
}
@ -352,8 +352,8 @@ type MockBroker_UpdateSegmentStatistics_Call struct {
}
// UpdateSegmentStatistics is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.UpdateSegmentStatisticsRequest
// - ctx context.Context
// - req *datapb.UpdateSegmentStatisticsRequest
func (_e *MockBroker_Expecter) UpdateSegmentStatistics(ctx interface{}, req interface{}) *MockBroker_UpdateSegmentStatistics_Call {
return &MockBroker_UpdateSegmentStatistics_Call{Call: _e.mock.On("UpdateSegmentStatistics", ctx, req)}
}

View File

@ -190,9 +190,10 @@ func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionPlanR
return true
})
// remote level zero results
// remove level zero results
lo.ForEach(completedLevelZero, func(planID int64, _ int) {
c.completed.Remove(planID)
c.completedCompactor.Remove(planID)
})
if len(results) > 0 {

View File

@ -21,7 +21,9 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
@ -115,6 +117,46 @@ func TestCompactionExecutor(t *testing.T) {
t.FailNow()
}
})
t.Run("test getAllCompactionResults", func(t *testing.T) {
ex := newCompactionExecutor()
mockC := newMockCompactor(true)
ex.executing.Insert(int64(1), mockC)
ex.completedCompactor.Insert(int64(2), mockC)
ex.completed.Insert(int64(2), &datapb.CompactionPlanResult{
PlanID: 2,
State: commonpb.CompactionState_Completed,
Type: datapb.CompactionType_MixCompaction,
})
ex.completedCompactor.Insert(int64(3), mockC)
ex.completed.Insert(int64(3), &datapb.CompactionPlanResult{
PlanID: 3,
State: commonpb.CompactionState_Completed,
Type: datapb.CompactionType_Level0DeleteCompaction,
})
require.Equal(t, 2, ex.completed.Len())
require.Equal(t, 2, ex.completedCompactor.Len())
require.Equal(t, 1, ex.executing.Len())
result := ex.getAllCompactionResults()
assert.Equal(t, 3, len(result))
for _, res := range result {
if res.PlanID == int64(1) {
assert.Equal(t, res.GetState(), commonpb.CompactionState_Executing)
} else {
assert.Equal(t, res.GetState(), commonpb.CompactionState_Completed)
}
}
assert.Equal(t, 1, ex.completed.Len())
require.Equal(t, 1, ex.completedCompactor.Len())
require.Equal(t, 1, ex.executing.Len())
})
}
func newMockCompactor(isvalid bool) *mockCompactor {