fix: Fix L0 compaction in datacoord (#28814)

See also: #27606

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/29006/head
XuanYang-cn 2023-12-05 18:44:37 +08:00 committed by GitHub
parent cb31016640
commit 5bac7f7897
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1019 additions and 199 deletions

View File

@ -429,6 +429,8 @@ generate-mockery-datacoord: getdeps
$(INSTALL_PATH)/mockery --name=IndexEngineVersionManager --dir=internal/datacoord --filename=mock_index_engine_version_manager.go --output=internal/datacoord --structname=MockVersionManager --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=TriggerManager --dir=internal/datacoord --filename=mock_trigger_manager.go --output=internal/datacoord --structname=MockTriggerManager --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=compactionPlanContext --dir=internal/datacoord --filename=mock_compaction_plan_context.go --output=internal/datacoord --structname=MockCompactionPlanContext --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=CompactionMeta --dir=internal/datacoord --filename=mock_compaction_meta.go --output=internal/datacoord --structname=MockCompactionMeta --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=Scheduler --dir=internal/datacoord --filename=mock_scheduler.go --output=internal/datacoord --structname=MockScheduler --with-expecter --inpackage
generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage

View File

@ -70,6 +70,18 @@ var (
errChannelInBuffer = errors.New("channel is in buffer")
)
type CompactionMeta interface {
SelectSegments(selector SegmentInfoSelector) []*SegmentInfo
GetHealthySegment(segID UniqueID) *SegmentInfo
UpdateSegmentsInfo(operators ...UpdateOperator) error
SetSegmentCompacting(segmentID int64, compacting bool)
PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *SegmentInfo, *segMetricMutation, error)
alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segmentsCompactFrom []*SegmentInfo) error
}
var _ CompactionMeta = (*meta)(nil)
type compactionTask struct {
triggerInfo *compactionSignal
plan *datapb.CompactionPlan
@ -97,18 +109,18 @@ type compactionPlanHandler struct {
mu sync.RWMutex
plans map[int64]*compactionTask // planID -> task
meta *meta
meta CompactionMeta
allocator allocator
chManager *ChannelManager
sessions *SessionManager
scheduler *CompactionScheduler
scheduler Scheduler
stopCh chan struct{}
stopOnce sync.Once
stopWg sync.WaitGroup
}
func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta *meta, allocator allocator,
func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta CompactionMeta, allocator allocator,
) *compactionPlanHandler {
return &compactionPlanHandler{
plans: make(map[int64]*compactionTask),
@ -120,6 +132,26 @@ func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta
}
}
func (c *compactionPlanHandler) checkResult() {
// deal results
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ts, err := c.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
}
_ = c.updateCompaction(ts)
}
func (c *compactionPlanHandler) schedule() {
// schedule queuing tasks
tasks := c.scheduler.Schedule()
if len(tasks) > 0 {
c.notifyTasks(tasks)
c.scheduler.LogStatus()
}
}
func (c *compactionPlanHandler) start() {
interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second)
c.stopCh = make(chan struct{})
@ -136,16 +168,7 @@ func (c *compactionPlanHandler) start() {
log.Info("compaction handler check result loop quit")
return
case <-checkResultTicker.C:
// deal results
cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ts, err := c.allocator.allocTimestamp(cctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
cancel()
continue
}
cancel()
_ = c.updateCompaction(ts)
c.checkResult()
}
}
}()
@ -164,13 +187,7 @@ func (c *compactionPlanHandler) start() {
return
case <-scheduleTicker.C:
// schedule queuing tasks
tasks := c.scheduler.schedule()
c.notifyTasks(tasks)
if len(tasks) > 0 {
c.scheduler.logStatus()
}
c.schedule()
}
}
}()
@ -188,7 +205,12 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
defer c.mu.Unlock()
for id, task := range c.plans {
if task.triggerInfo.channel == channel {
c.scheduler.finish(task.dataNodeID, task.plan.PlanID)
log.Info("Compaction handler removing tasks by channel",
zap.String("channel", channel),
zap.Int64("planID", task.plan.GetPlanID()),
zap.Int64("node", task.dataNodeID),
)
c.scheduler.Finish(task.dataNodeID, task.plan.PlanID)
delete(c.plans, id)
}
}
@ -197,11 +219,9 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
func (c *compactionPlanHandler) updateTask(planID int64, opts ...compactionTaskOpt) {
c.mu.Lock()
defer c.mu.Unlock()
plan, ok := c.plans[planID]
if !ok {
return
if plan, ok := c.plans[planID]; ok {
c.plans[planID] = plan.shadowClone(opts...)
}
c.plans[planID] = plan.shadowClone(opts...)
}
func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
@ -231,10 +251,11 @@ func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *data
func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) {
plan := task.plan
log := log.With(zap.Int64("taskID", task.triggerInfo.id), zap.Int64("planID", plan.GetPlanID()))
if plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
sealedSegments := c.meta.SelectSegments(func(info *SegmentInfo) bool {
return info.GetCollectionID() == task.triggerInfo.collectionID &&
info.GetPartitionID() == task.triggerInfo.partitionID &&
(task.triggerInfo.partitionID == -1 || info.GetPartitionID() == task.triggerInfo.partitionID) &&
info.GetInsertChannel() == plan.GetChannel() &&
isFlushState(info.GetState()) &&
!info.isCompacting &&
@ -251,14 +272,17 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) {
})
plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...)
log.Info("Compaction handler refreshed level zero compaction plan", zap.Any("target segments", sealedSegBinlogs))
return
}
if plan.GetType() == datapb.CompactionType_MixCompaction {
for _, seg := range plan.GetSegmentBinlogs() {
info := c.meta.GetSegment(seg.GetSegmentID())
seg.Deltalogs = info.GetDeltalogs()
if info := c.meta.GetHealthySegment(seg.GetSegmentID()); info != nil {
seg.Deltalogs = info.GetDeltalogs()
}
}
log.Info("Compaction handler refresed mix compaction plan")
return
}
}
@ -317,12 +341,16 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionPlan
plan := c.plans[planID].plan
nodeID := c.plans[planID].dataNodeID
defer c.scheduler.finish(nodeID, plan.PlanID)
defer c.scheduler.Finish(nodeID, plan.PlanID)
switch plan.GetType() {
case datapb.CompactionType_MergeCompaction, datapb.CompactionType_MixCompaction:
if err := c.handleMergeCompactionResult(plan, result); err != nil {
return err
}
case datapb.CompactionType_Level0DeleteCompaction:
if err := c.handleL0CompactionResult(plan, result); err != nil {
return err
}
default:
return errors.New("unknown compaction type")
}
@ -332,6 +360,26 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionPlan
return nil
}
func (c *compactionPlanHandler) handleL0CompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) error {
var operators []UpdateOperator
for _, seg := range result.GetSegments() {
operators = append(operators, UpdateBinlogsOperator(seg.GetSegmentID(), nil, nil, seg.GetDeltalogs()))
}
levelZeroSegments := lo.Filter(plan.GetSegmentBinlogs(), func(b *datapb.CompactionSegmentBinlogs, _ int) bool {
return b.GetLevel() == datapb.SegmentLevel_L0
})
for _, seg := range levelZeroSegments {
operators = append(operators, UpdateStatusOperator(seg.SegmentID, commonpb.SegmentState_Dropped))
}
log.Info("meta update: update segments info for level zero compaction",
zap.Int64("planID", plan.GetPlanID()),
)
return c.meta.UpdateSegmentsInfo(operators...)
}
func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) error {
log := log.With(zap.Int64("planID", plan.GetPlanID()))
if len(result.GetSegments()) == 0 || len(result.GetSegments()) > 1 {
@ -346,7 +394,7 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
log.Info("meta has already been changed, skip meta change and retry sync segments")
} else {
// Also prepare metric updates.
_, modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan, result)
modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan, result)
if err != nil {
return err
}
@ -434,7 +482,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed))
c.setSegmentsCompacting(task.plan, false)
c.scheduler.finish(task.dataNodeID, task.plan.PlanID)
c.scheduler.Finish(task.dataNodeID, task.plan.PlanID)
}
// Timeout tasks will be timeout and failed in DataNode
@ -448,7 +496,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
log.Info("compaction failed for timeout", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed))
c.setSegmentsCompacting(task.plan, false)
c.scheduler.finish(task.dataNodeID, task.plan.PlanID)
c.scheduler.Finish(task.dataNodeID, task.plan.PlanID)
}
// DataNode will check if plan's are timeout but not as sensitive as DataCoord,
@ -471,7 +519,7 @@ func (c *compactionPlanHandler) isTimeout(now Timestamp, start Timestamp, timeou
// isFull return true if the task pool is full
func (c *compactionPlanHandler) isFull() bool {
return c.scheduler.getExecutingTaskNum() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
return c.scheduler.GetTaskCount() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
}
func (c *compactionPlanHandler) getTasksByState(state compactionTaskState) []*compactionTask {

View File

@ -27,6 +27,15 @@ func (v *LevelZeroSegmentsView) String() string {
l0strings)
}
func (v *LevelZeroSegmentsView) Append(segments ...*SegmentView) {
if v.segments == nil {
v.segments = segments
return
}
v.segments = append(v.segments, segments...)
}
func (v *LevelZeroSegmentsView) GetGroupLabel() *CompactionGroupLabel {
if v == nil {
return &CompactionGroupLabel{}

View File

@ -14,6 +14,10 @@ import (
type Scheduler interface {
Submit(t ...*compactionTask)
Schedule() []*compactionTask
Finish(nodeID, planID int64)
GetTaskCount() int
LogStatus()
// Start()
// Stop()
@ -46,11 +50,11 @@ func (s *CompactionScheduler) Submit(tasks ...*compactionTask) {
s.mu.Unlock()
s.taskNumber.Add(int32(len(tasks)))
s.logStatus()
s.LogStatus()
}
// schedule pick 1 or 0 tasks for 1 node
func (s *CompactionScheduler) schedule() []*compactionTask {
// Schedule pick 1 or 0 tasks for 1 node
func (s *CompactionScheduler) Schedule() []*compactionTask {
nodeTasks := make(map[int64][]*compactionTask) // nodeID
s.mu.Lock()
@ -138,7 +142,9 @@ func (s *CompactionScheduler) schedule() []*compactionTask {
return lo.Values(executable)
}
func (s *CompactionScheduler) finish(nodeID, planID UniqueID) {
func (s *CompactionScheduler) Finish(nodeID, planID UniqueID) {
log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID))
s.mu.Lock()
if parallel, ok := s.parallelTasks[nodeID]; ok {
tasks := lo.Filter(parallel, func(t *compactionTask, _ int) bool {
@ -146,14 +152,23 @@ func (s *CompactionScheduler) finish(nodeID, planID UniqueID) {
})
s.parallelTasks[nodeID] = tasks
s.taskNumber.Dec()
log.Info("Compaction scheduler remove task from executing")
}
s.mu.Unlock()
log.Info("Compaction finished", zap.Int64("planID", planID), zap.Int64("nodeID", nodeID))
s.logStatus()
filtered := lo.Filter(s.queuingTasks, func(t *compactionTask, _ int) bool {
return t.plan.PlanID != planID
})
if len(filtered) < len(s.queuingTasks) {
s.queuingTasks = filtered
s.taskNumber.Dec()
log.Info("Compaction scheduler remove task from queue")
}
s.mu.Unlock()
s.LogStatus()
}
func (s *CompactionScheduler) logStatus() {
func (s *CompactionScheduler) LogStatus() {
s.mu.RLock()
defer s.mu.RUnlock()
waiting := lo.Map(s.queuingTasks, func(t *compactionTask, _ int) int64 {
@ -172,6 +187,6 @@ func (s *CompactionScheduler) logStatus() {
}
}
func (s *CompactionScheduler) getExecutingTaskNum() int {
func (s *CompactionScheduler) GetTaskCount() int {
return int(s.taskNumber.Load())
}

View File

@ -38,10 +38,10 @@ func (s *SchedulerSuite) SetupTest() {
func (s *SchedulerSuite) TestScheduleEmpty() {
emptySch := NewCompactionScheduler()
tasks := emptySch.schedule()
tasks := emptySch.Schedule()
s.Empty(tasks)
s.Equal(0, emptySch.getExecutingTaskNum())
s.Equal(0, emptySch.GetTaskCount())
s.Empty(emptySch.queuingTasks)
s.Empty(emptySch.parallelTasks)
}
@ -67,13 +67,13 @@ func (s *SchedulerSuite) TestScheduleParallelTaskFull() {
for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.getExecutingTaskNum())
s.Require().Equal(4, s.scheduler.GetTaskCount())
// submit the testing tasks
s.scheduler.Submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
s.Equal(4+len(test.tasks), s.scheduler.GetTaskCount())
gotTasks := s.scheduler.schedule()
gotTasks := s.scheduler.Schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
}))
@ -106,22 +106,22 @@ func (s *SchedulerSuite) TestScheduleNodeWith1ParallelTask() {
for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.getExecutingTaskNum())
s.Require().Equal(4, s.scheduler.GetTaskCount())
// submit the testing tasks
s.scheduler.Submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
s.Equal(4+len(test.tasks), s.scheduler.GetTaskCount())
gotTasks := s.scheduler.schedule()
gotTasks := s.scheduler.Schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
}))
// the second schedule returns empty for full paralleTasks
gotTasks = s.scheduler.schedule()
gotTasks = s.scheduler.Schedule()
s.Empty(gotTasks)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
s.Equal(4+len(test.tasks), s.scheduler.GetTaskCount())
})
}
}
@ -153,24 +153,24 @@ func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() {
for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.getExecutingTaskNum())
s.Require().Equal(4, s.scheduler.GetTaskCount())
// submit the testing tasks
s.scheduler.Submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
s.Equal(4+len(test.tasks), s.scheduler.GetTaskCount())
gotTasks := s.scheduler.schedule()
gotTasks := s.scheduler.Schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
}))
// the second schedule returns empty for full paralleTasks
if len(gotTasks) > 0 {
gotTasks = s.scheduler.schedule()
gotTasks = s.scheduler.Schedule()
s.Empty(gotTasks)
}
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
s.Equal(4+len(test.tasks), s.scheduler.GetTaskCount())
})
}
}

View File

@ -23,9 +23,11 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -39,6 +41,203 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestCompactionPlanHandlerSuite(t *testing.T) {
suite.Run(t, new(CompactionPlanHandlerSuite))
}
type CompactionPlanHandlerSuite struct {
suite.Suite
mockMeta *MockCompactionMeta
mockAlloc *NMockAllocator
mockSch *MockScheduler
}
func (s *CompactionPlanHandlerSuite) SetupTest() {
s.mockMeta = NewMockCompactionMeta(s.T())
s.mockAlloc = NewNMockAllocator(s.T())
s.mockSch = NewMockScheduler(s.T())
}
func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() {
s.mockSch.EXPECT().Finish(mock.Anything, mock.Anything).Return().Once()
handler := newCompactionPlanHandler(nil, nil, nil, nil)
handler.scheduler = s.mockSch
var ch string = "ch1"
handler.mu.Lock()
handler.plans[1] = &compactionTask{
plan: &datapb.CompactionPlan{PlanID: 19530},
dataNodeID: 1,
triggerInfo: &compactionSignal{channel: ch},
}
handler.mu.Unlock()
handler.removeTasksByChannel(ch)
handler.mu.Lock()
s.Equal(0, len(handler.plans))
handler.mu.Unlock()
}
func (s *CompactionPlanHandlerSuite) TestCheckResult() {
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil)
session := &SessionManager{
sessions: struct {
sync.RWMutex
data map[int64]*Session
}{
data: map[int64]*Session{
2: {client: &mockDataNodeClient{
compactionStateResp: &datapb.CompactionStateResponse{
Results: []*datapb.CompactionPlanResult{
{PlanID: 1, State: commonpb.CompactionState_Executing},
{PlanID: 3, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 3}}},
{PlanID: 4, State: commonpb.CompactionState_Executing},
{PlanID: 6, State: commonpb.CompactionState_Executing},
},
},
}},
},
},
}
handler := newCompactionPlanHandler(session, nil, nil, s.mockAlloc)
handler.checkResult()
}
func (s *CompactionPlanHandlerSuite) TestHandleL0CompactionResults() {
channel := "Ch-1"
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Run(func(operators ...UpdateOperator) {
s.Equal(5, len(operators))
}).Return(nil).Once()
deltalogs := []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))}
// 2 l0 segments, 3 sealed segments
plan := &datapb.CompactionPlan{
PlanID: 1,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 100,
Deltalogs: deltalogs,
Level: datapb.SegmentLevel_L0,
InsertChannel: channel,
},
{
SegmentID: 101,
Deltalogs: deltalogs,
Level: datapb.SegmentLevel_L0,
InsertChannel: channel,
},
{
SegmentID: 200,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
},
{
SegmentID: 201,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
},
{
SegmentID: 202,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
},
},
Type: datapb.CompactionType_Level0DeleteCompaction,
}
result := &datapb.CompactionPlanResult{
PlanID: plan.GetPlanID(),
State: commonpb.CompactionState_Completed,
Channel: channel,
Segments: []*datapb.CompactionSegment{
{
SegmentID: 200,
Deltalogs: deltalogs,
Channel: channel,
},
{
SegmentID: 201,
Deltalogs: deltalogs,
Channel: channel,
},
{
SegmentID: 202,
Deltalogs: deltalogs,
Channel: channel,
},
},
}
handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc)
err := handler.handleL0CompactionResult(plan, result)
s.NoError(err)
}
func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() {
channel := "Ch-1"
s.mockMeta.EXPECT().SelectSegments(mock.Anything).Return(
[]*SegmentInfo{
{SegmentInfo: &datapb.SegmentInfo{
ID: 200,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
}},
{SegmentInfo: &datapb.SegmentInfo{
ID: 201,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
}},
{SegmentInfo: &datapb.SegmentInfo{
ID: 202,
Level: datapb.SegmentLevel_L1,
InsertChannel: channel,
}},
},
)
deltalogs := []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))}
// 2 l0 segments
plan := &datapb.CompactionPlan{
PlanID: 1,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 100,
Deltalogs: deltalogs,
Level: datapb.SegmentLevel_L0,
InsertChannel: channel,
},
{
SegmentID: 101,
Deltalogs: deltalogs,
Level: datapb.SegmentLevel_L0,
InsertChannel: channel,
},
},
Type: datapb.CompactionType_Level0DeleteCompaction,
}
task := &compactionTask{
triggerInfo: &compactionSignal{id: 19530, collectionID: 1, partitionID: 10},
state: executing,
plan: plan,
dataNodeID: 1,
}
handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc)
handler.RefreshPlan(task)
s.Equal(5, len(task.plan.GetSegmentBinlogs()))
segIDs := lo.Map(task.plan.GetSegmentBinlogs(), func(b *datapb.CompactionSegmentBinlogs, _ int) int64 {
return b.GetSegmentID()
})
s.ElementsMatch([]int64{200, 201, 202, 100, 101}, segIDs)
}
func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
type fields struct {
plans map[int64]*compactionTask
@ -111,12 +310,13 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
scheduler := NewCompactionScheduler()
c := &compactionPlanHandler{
plans: tt.fields.plans,
sessions: tt.fields.sessions,
chManager: tt.fields.chManager,
allocator: tt.fields.allocatorFactory(),
scheduler: NewCompactionScheduler(),
scheduler: scheduler,
}
Params.Save(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key, "1")
c.start()
@ -127,13 +327,13 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
if !tt.wantErr {
assert.Equal(t, tt.args.plan, task.plan)
assert.Equal(t, tt.args.signal, task.triggerInfo)
assert.Equal(t, 1, c.scheduler.getExecutingTaskNum())
assert.Equal(t, 1, c.scheduler.GetTaskCount())
} else {
assert.Eventually(t,
func() bool {
c.scheduler.mu.RLock()
defer c.scheduler.mu.RUnlock()
return c.scheduler.getExecutingTaskNum() == 0 && len(c.scheduler.parallelTasks[1]) == 0
scheduler.mu.RLock()
defer scheduler.mu.RUnlock()
return c.scheduler.GetTaskCount() == 0 && len(scheduler.parallelTasks[1]) == 0
},
5*time.Second, 100*time.Millisecond)
}
@ -191,20 +391,11 @@ func Test_compactionPlanHandler_execWithParallels(t *testing.T) {
err = c.execCompactionPlan(signal, plan3)
require.NoError(t, err)
assert.Equal(t, 3, c.scheduler.getExecutingTaskNum())
assert.Equal(t, 3, c.scheduler.GetTaskCount())
// parallel for the same node are 2
tasks := c.scheduler.schedule()
assert.Equal(t, 1, len(tasks))
assert.Equal(t, int64(1), tasks[0].plan.PlanID)
assert.Equal(t, int64(1), tasks[0].dataNodeID)
c.notifyTasks(tasks)
tasks = c.scheduler.schedule()
assert.Equal(t, 1, len(tasks))
assert.Equal(t, int64(2), tasks[0].plan.PlanID)
assert.Equal(t, int64(1), tasks[0].dataNodeID)
c.notifyTasks(tasks)
c.schedule()
c.schedule()
// wait for compaction called
assert.Eventually(t, func() bool {
@ -213,7 +404,7 @@ func Test_compactionPlanHandler_execWithParallels(t *testing.T) {
return called == 2
}, 3*time.Second, time.Millisecond*100)
tasks = c.scheduler.schedule()
tasks := c.scheduler.Schedule()
assert.Equal(t, 0, len(tasks))
}
@ -352,11 +543,11 @@ func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) {
},
}
has, err := c.meta.HasSegments([]UniqueID{1, 2})
has, err := meta.HasSegments([]UniqueID{1, 2})
require.NoError(t, err)
require.True(t, has)
has, err = c.meta.HasSegments([]UniqueID{3})
has, err = meta.HasSegments([]UniqueID{3})
require.Error(t, err)
require.False(t, has)
@ -369,7 +560,7 @@ func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) {
err = c2.handleMergeCompactionResult(plan, compactionResult2)
assert.Error(t, err)
has, err = c.meta.HasSegments([]UniqueID{1, 2, 3})
has, err = meta.HasSegments([]UniqueID{1, 2, 3})
require.NoError(t, err)
require.True(t, has)
@ -761,11 +952,12 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
scheduler := NewCompactionScheduler()
c := &compactionPlanHandler{
plans: tt.fields.plans,
sessions: tt.fields.sessions,
meta: tt.fields.meta,
scheduler: NewCompactionScheduler(),
scheduler: scheduler,
}
err := c.updateCompaction(tt.args.ts)
@ -786,9 +978,9 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
assert.NotEqual(t, failed, task.state)
}
c.scheduler.mu.Lock()
assert.Equal(t, 0, len(c.scheduler.parallelTasks[2]))
c.scheduler.mu.Unlock()
scheduler.mu.Lock()
assert.Equal(t, 0, len(scheduler.parallelTasks[2]))
scheduler.mu.Unlock()
})
}
}

View File

@ -357,7 +357,8 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() // not importing now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() != datapb.SegmentLevel_L0 // ignore level zero segments
}) // m is list of chanPartSegments, which is channel-partition organized segments
if len(m) == 0 {
@ -795,7 +796,8 @@ func (t *compactionTrigger) getCandidateSegments(channel string, partitionID Uni
s.GetInsertChannel() != channel ||
s.GetPartitionID() != partitionID ||
s.isCompacting ||
s.GetIsImporting() {
s.GetIsImporting() ||
s.GetLevel() == datapb.SegmentLevel_L0 {
continue
}
res = append(res, s)

View File

@ -50,22 +50,23 @@ func NewCompactionTriggerManager(meta *meta, alloc allocator, handler compaction
}
func (m *CompactionTriggerManager) Notify(taskID UniqueID, eventType CompactionTriggerType, views []CompactionView) {
log := log.With(zap.Int64("taskID", taskID))
for _, view := range views {
switch eventType {
case TriggerTypeLevelZeroView:
log.Info("Start to trigger a level zero compaction")
outView := view.Trigger()
if outView == nil {
continue
}
log.Info("Finish trigger out view, build level zero compaction plan", zap.String("out view", outView.String()))
plan := m.BuildLevelZeroCompactionPlan(outView)
if plan == nil {
continue
}
log.Info("Trigger a LevelZeroCompaction plan", zap.String("output view", outView.String()))
label := outView.GetGroupLabel()
signal := &compactionSignal{
id: taskID,
isForce: false,
@ -78,6 +79,7 @@ func (m *CompactionTriggerManager) Notify(taskID UniqueID, eventType CompactionT
// TODO, remove handler, use scheduler
// m.scheduler.Submit(plan)
m.handler.execCompactionPlan(signal, plan)
log.Info("Finish to trigger a LevelZeroCompaction plan", zap.String("output view", outView.String()))
}
}
}

View File

@ -53,8 +53,9 @@ func (s *CompactionTriggerManagerSuite) TestNotify() {
return info.GetLevel() == datapb.SegmentLevel_L0
})
segmentViews, levelZeroView := viewManager.GetLatestLevelZeroSegmentWithSignals(1, levelZeroSegments)
s.Require().NotEmpty(segmentViews)
latestL0Segments := GetViewsByInfo(levelZeroSegments...)
s.Require().NotEmpty(latestL0Segments)
levelZeroView := viewManager.getChangedLevelZeroViews(1, latestL0Segments)
s.Require().Equal(1, len(levelZeroView))
cView, ok := levelZeroView[0].(*LevelZeroSegmentsView)
s.True(ok)

View File

@ -15,6 +15,7 @@ import (
type CompactionView interface {
GetGroupLabel() *CompactionGroupLabel
GetSegmentsView() []*SegmentView
Append(segments ...*SegmentView)
String() string
Trigger() CompactionView
}
@ -25,7 +26,7 @@ type FullViews struct {
type SegmentViewSelector func(view *SegmentView) bool
func (v *FullViews) GetSegmentViewBy(collectionID UniqueID, selectors SegmentViewSelector) []*SegmentView {
func (v *FullViews) GetSegmentViewBy(collectionID UniqueID, selector SegmentViewSelector) []*SegmentView {
views, ok := v.collections[collectionID]
if !ok {
return nil
@ -34,8 +35,8 @@ func (v *FullViews) GetSegmentViewBy(collectionID UniqueID, selectors SegmentVie
var ret []*SegmentView
for _, view := range views {
if selectors(view) {
ret = append(ret, view)
if selector == nil || selector(view) {
ret = append(ret, view.Clone())
}
}
@ -48,6 +49,10 @@ type CompactionGroupLabel struct {
Channel string
}
func (label *CompactionGroupLabel) Key() string {
return fmt.Sprintf("%d-%s", label.PartitionID, label.Channel)
}
func (label *CompactionGroupLabel) IsMinGroup() bool {
return len(label.Channel) != 0 && label.PartitionID != 0 && label.CollectionID != 0
}
@ -86,7 +91,24 @@ type SegmentView struct {
DeltalogCount int
}
func GetSegmentViews(segments ...*SegmentInfo) []*SegmentView {
func (s *SegmentView) Clone() *SegmentView {
return &SegmentView{
ID: s.ID,
label: s.label,
State: s.State,
Level: s.Level,
startPos: s.startPos,
dmlPos: s.dmlPos,
Size: s.Size,
ExpireSize: s.ExpireSize,
DeltaSize: s.DeltaSize,
BinlogCount: s.BinlogCount,
StatslogCount: s.StatslogCount,
DeltalogCount: s.DeltalogCount,
}
}
func GetViewsByInfo(segments ...*SegmentInfo) []*SegmentView {
return lo.Map(segments, func(segment *SegmentInfo, _ int) *SegmentView {
return &SegmentView{
ID: segment.ID,
@ -170,7 +192,3 @@ func GetBinlogSizeAsBytes(deltaBinlogs []*datapb.FieldBinlog) float64 {
}
return deltaSize
}
func buildGroupKey(partitionID UniqueID, channel string) string {
return fmt.Sprintf("%d-%s", partitionID, channel)
}

View File

@ -17,9 +17,9 @@ type CompactionViewManager struct {
view *FullViews
viewGuard sync.RWMutex
meta *meta
eventManager TriggerManager
allocator allocator
meta *meta
trigger TriggerManager
allocator allocator
closeSig chan struct{}
closeWg sync.WaitGroup
@ -30,10 +30,10 @@ func NewCompactionViewManager(meta *meta, trigger TriggerManager, allocator allo
view: &FullViews{
collections: make(map[int64][]*SegmentView),
},
meta: meta,
eventManager: trigger,
allocator: allocator,
closeSig: make(chan struct{}),
meta: meta,
trigger: trigger,
allocator: allocator,
closeSig: make(chan struct{}),
}
}
@ -58,6 +58,8 @@ func (m *CompactionViewManager) checkLoop() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
log.Info("Compaction view manager start")
for {
select {
case <-m.closeSig:
@ -79,11 +81,13 @@ func (m *CompactionViewManager) Check() {
ctx := context.TODO()
taskID, err := m.allocator.allocID(ctx)
if err != nil {
log.Warn("CompactionViewManager check loop failed, unable to allocate taskID",
log.Warn("CompactionViewManager check failed, unable to allocate taskID",
zap.Error(err))
return
}
log := log.With(zap.Int64("taskID", taskID))
m.viewGuard.Lock()
defer m.viewGuard.Unlock()
@ -93,86 +97,71 @@ func (m *CompactionViewManager) Check() {
latestCollIDs := lo.Keys(latestCollSegs)
viewCollIDs := lo.Keys(m.view.collections)
diffAdd, diffRemove := lo.Difference(latestCollIDs, viewCollIDs)
_, diffRemove := lo.Difference(latestCollIDs, viewCollIDs)
for _, collID := range diffRemove {
delete(m.view.collections, collID)
}
// TODO: update all segments views. For now, just update Level Zero Segments
for collID, segments := range latestCollSegs {
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
return info.GetLevel() == datapb.SegmentLevel_L0
})
// For new collection, TODO: update all segments
// - for now, just update Level Zero Segments
if lo.Contains(diffAdd, collID) {
m.view.collections[collID] = GetSegmentViews(levelZeroSegments...)
latestL0Segments := GetViewsByInfo(levelZeroSegments...)
changedL0Views := m.getChangedLevelZeroViews(collID, latestL0Segments)
if len(changedL0Views) == 0 {
continue
}
latestLevelZeroViews, signals := m.GetLatestLevelZeroSegmentWithSignals(collID, levelZeroSegments)
if len(latestLevelZeroViews) != 0 {
m.view.collections[collID] = latestLevelZeroViews
}
log.Info("Refresh compaction level zero views",
zap.Int64("collectionID", collID),
zap.Strings("views", lo.Map(changedL0Views, func(view CompactionView, _ int) string {
return view.String()
})))
events[TriggerTypeLevelZeroView] = signals
m.view.collections[collID] = latestL0Segments
events[TriggerTypeLevelZeroView] = changedL0Views
}
for eType, views := range events {
m.eventManager.Notify(taskID, eType, views)
m.trigger.Notify(taskID, eType, views)
}
}
func (m *CompactionViewManager) GetLatestLevelZeroSegmentWithSignals(collID UniqueID, LevelZeroSegments []*SegmentInfo) ([]*SegmentView, []CompactionView) {
partChanView := m.BuildLevelZeroSegmentsView(collID, LevelZeroSegments)
func (m *CompactionViewManager) getChangedLevelZeroViews(collID UniqueID, LevelZeroViews []*SegmentView) []CompactionView {
latestViews := m.groupL0ViewsByPartChan(collID, LevelZeroViews)
cachedViews := m.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
return v.Level == datapb.SegmentLevel_L0
})
var signals []CompactionView
var needUpdate bool = false
for _, latestView := range partChanView {
views := m.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
return v.label.PartitionID == latestView.label.PartitionID &&
v.label.Channel == latestView.label.Channel
for _, latestView := range latestViews {
views := lo.Filter(cachedViews, func(v *SegmentView, _ int) bool {
return v.label.Equal(latestView.GetGroupLabel())
})
if !latestView.Equal(views) {
needUpdate = true
signals = append(signals, latestView)
}
}
if needUpdate {
var allViews []*SegmentView
for _, latestView := range partChanView {
allViews = append(allViews, latestView.segments...)
}
return allViews, signals
}
return nil, signals
return signals
}
func (m *CompactionViewManager) BuildLevelZeroSegmentsView(collectionID UniqueID, levelZeroSegments []*SegmentInfo) []*LevelZeroSegmentsView {
partChanView := make(map[string]*LevelZeroSegmentsView) // "part-chan" to earliestStartPosition
for _, seg := range levelZeroSegments {
key := buildGroupKey(seg.PartitionID, seg.InsertChannel)
func (m *CompactionViewManager) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) map[string]*LevelZeroSegmentsView {
partChanView := make(map[string]*LevelZeroSegmentsView) // "part-chan" as key
for _, view := range levelZeroSegments {
key := view.label.Key()
if _, ok := partChanView[key]; !ok {
label := &CompactionGroupLabel{
CollectionID: collectionID,
PartitionID: seg.PartitionID,
Channel: seg.InsertChannel,
}
partChanView[key] = &LevelZeroSegmentsView{
label: label,
segments: []*SegmentView{},
earliestGrowingSegmentPos: m.meta.GetEarliestStartPositionOfGrowingSegments(label),
label: view.label,
segments: []*SegmentView{view},
earliestGrowingSegmentPos: m.meta.GetEarliestStartPositionOfGrowingSegments(view.label),
}
} else {
partChanView[key].Append(view)
}
partChanView[key].segments = append(partChanView[key].segments, GetSegmentViews(seg)[0])
}
return lo.Values(partChanView)
return partChanView
}

View File

@ -104,26 +104,7 @@ func (s *CompactionViewManagerSuite) TestCheck() {
paramtable.Get().Save(Params.DataCoordCfg.EnableLevelZeroSegment.Key, "true")
defer paramtable.Get().Reset(Params.DataCoordCfg.EnableLevelZeroSegment.Key)
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Times(3)
// nothing int the view, just store in the first check
s.Empty(s.m.view.collections)
s.m.Check()
for _, views := range s.m.view.collections {
for _, view := range views {
s.Equal(datapb.SegmentLevel_L0, view.Level)
s.Equal(commonpb.SegmentState_Flushed, view.State)
log.Info("String", zap.String("segment", view.String()))
log.Info("LevelZeroString", zap.String("segment", view.LevelZeroString()))
}
}
// change of meta
addInfo := genTestSegmentInfo(s.testLabel, 19530, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed)
addInfo.Deltalogs = genTestDeltalogs(1, 10)
s.m.meta.Lock()
s.m.meta.segments.segments[addInfo.GetID()] = addInfo
s.m.meta.Unlock()
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Times(2)
s.mockTriggerManager.EXPECT().Notify(mock.Anything, mock.Anything, mock.Anything).
Run(func(taskID UniqueID, tType CompactionTriggerType, views []CompactionView) {
s.EqualValues(1, taskID)
@ -133,7 +114,7 @@ func (s *CompactionViewManagerSuite) TestCheck() {
s.True(ok)
s.NotNil(v)
expectedSegs := []int64{100, 101, 102, 103, 19530}
expectedSegs := []int64{100, 101, 102, 103}
gotSegs := lo.Map(v.segments, func(s *SegmentView, _ int) int64 { return s.ID })
s.ElementsMatch(expectedSegs, gotSegs)
@ -141,8 +122,22 @@ func (s *CompactionViewManagerSuite) TestCheck() {
log.Info("All views", zap.String("l0 view", v.String()))
}).Once()
// nothing in the view before the test
s.Empty(s.m.view.collections)
s.m.Check()
s.m.viewGuard.Lock()
views := s.m.view.GetSegmentViewBy(s.testLabel.CollectionID, nil)
s.m.viewGuard.Unlock()
s.Equal(4, len(views))
for _, view := range views {
s.EqualValues(s.testLabel, view.label)
s.Equal(datapb.SegmentLevel_L0, view.Level)
s.Equal(commonpb.SegmentState_Flushed, view.State)
log.Info("String", zap.String("segment", view.String()))
log.Info("LevelZeroString", zap.String("segment", view.LevelZeroString()))
}
// clear meta
s.m.meta.Lock()
s.m.meta.segments.segments = make(map[int64]*SegmentInfo)

View File

@ -20,6 +20,7 @@ package datacoord
import (
"context"
"fmt"
"math"
"path"
"sync"
"time"
@ -958,30 +959,25 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
}
// PrepareCompleteCompactionMutation returns
// - the segment info of compactedFrom segments before compaction to revert
// - the segment info of compactedFrom segments after compaction to alter
// - the segment info of compactedTo segment after compaction to add
// The compactedTo segment could contain 0 numRows
// TODO: too complicated
func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan,
result *datapb.CompactionPlanResult,
) ([]*SegmentInfo, []*SegmentInfo, *SegmentInfo, *segMetricMutation, error) {
) ([]*SegmentInfo, *SegmentInfo, *segMetricMutation, error) {
log.Info("meta update: prepare for complete compaction mutation")
compactionLogs := plan.GetSegmentBinlogs()
m.Lock()
defer m.Unlock()
var (
oldSegments = make([]*SegmentInfo, 0, len(compactionLogs))
modSegments = make([]*SegmentInfo, 0, len(compactionLogs))
)
modSegments := make([]*SegmentInfo, 0, len(compactionLogs))
metricMutation := &segMetricMutation{
stateChange: make(map[string]map[string]int),
}
for _, cl := range compactionLogs {
if segment := m.segments.GetSegment(cl.GetSegmentID()); segment != nil {
oldSegments = append(oldSegments, segment.Clone())
cloned := segment.Clone()
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
cloned.DroppedAt = uint64(time.Now().UnixNano())
@ -1017,10 +1013,10 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan,
// MixCompaction / MergeCompaction will generates one and only one segment
compactToSegment := result.GetSegments()[0]
newAddedDeltalogs := m.updateDeltalogs(originDeltalogs, deletedDeltalogs, nil)
newAddedDeltalogs := updateDeltalogs(originDeltalogs, deletedDeltalogs, nil)
copiedDeltalogs, err := m.copyDeltaFiles(newAddedDeltalogs, modSegments[0].CollectionID, modSegments[0].PartitionID, compactToSegment.GetSegmentID())
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, err
}
deltalogs := append(compactToSegment.GetDeltalogs(), copiedDeltalogs...)
@ -1055,7 +1051,7 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan,
zap.Int64("new segment num of rows", segment.GetNumOfRows()),
zap.Any("compacted from", segment.GetCompactionFrom()))
return oldSegments, modSegments, segment, metricMutation, nil
return modSegments, segment, metricMutation, nil
}
func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, partitionID, targetSegmentID int64) ([]*datapb.FieldBinlog, error) {
@ -1174,7 +1170,7 @@ func (m *meta) updateBinlogs(origin []*datapb.FieldBinlog, removes []*datapb.Fie
return res
}
func (m *meta) updateDeltalogs(origin []*datapb.FieldBinlog, removes []*datapb.FieldBinlog, adds []*datapb.FieldBinlog) []*datapb.FieldBinlog {
func updateDeltalogs(origin []*datapb.FieldBinlog, removes []*datapb.FieldBinlog, adds []*datapb.FieldBinlog) []*datapb.FieldBinlog {
res := make([]*datapb.FieldBinlog, 0, len(origin))
for _, fbl := range origin {
logs := make(map[string]*datapb.Binlog)
@ -1332,7 +1328,7 @@ func (m *meta) GetEarliestStartPositionOfGrowingSegments(label *CompactionGroupL
segment.GetInsertChannel() == label.Channel
})
var earliest *msgpb.MsgPosition
earliest := &msgpb.MsgPosition{Timestamp: math.MaxUint64}
for _, seg := range segments {
if earliest == nil || earliest.GetTimestamp() > seg.GetStartPosition().GetTimestamp() {
earliest = seg.GetStartPosition()

View File

@ -726,21 +726,14 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
inCompactionResult := &datapb.CompactionPlanResult{
Segments: []*datapb.CompactionSegment{inSegment},
}
beforeCompact, afterCompact, newSegment, metricMutation, err := m.PrepareCompleteCompactionMutation(plan, inCompactionResult)
afterCompact, newSegment, metricMutation, err := m.PrepareCompleteCompactionMutation(plan, inCompactionResult)
assert.NoError(t, err)
assert.NotNil(t, beforeCompact)
assert.NotNil(t, afterCompact)
assert.NotNil(t, newSegment)
assert.Equal(t, 3, len(metricMutation.stateChange[datapb.SegmentLevel_Legacy.String()]))
assert.Equal(t, int64(0), metricMutation.rowCountChange)
assert.Equal(t, int64(2), metricMutation.rowCountAccChange)
require.Equal(t, 2, len(beforeCompact))
assert.Equal(t, commonpb.SegmentState_Flushed, beforeCompact[0].GetState())
assert.Equal(t, commonpb.SegmentState_Flushed, beforeCompact[1].GetState())
assert.Zero(t, beforeCompact[0].GetDroppedAt())
assert.Zero(t, beforeCompact[1].GetDroppedAt())
require.Equal(t, 2, len(afterCompact))
assert.Equal(t, commonpb.SegmentState_Dropped, afterCompact[0].GetState())
assert.Equal(t, commonpb.SegmentState_Dropped, afterCompact[1].GetState())

View File

@ -0,0 +1,328 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package datacoord
import (
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
mock "github.com/stretchr/testify/mock"
)
// MockCompactionMeta is an autogenerated mock type for the CompactionMeta type
type MockCompactionMeta struct {
mock.Mock
}
type MockCompactionMeta_Expecter struct {
mock *mock.Mock
}
func (_m *MockCompactionMeta) EXPECT() *MockCompactionMeta_Expecter {
return &MockCompactionMeta_Expecter{mock: &_m.Mock}
}
// GetHealthySegment provides a mock function with given fields: segID
func (_m *MockCompactionMeta) GetHealthySegment(segID int64) *SegmentInfo {
ret := _m.Called(segID)
var r0 *SegmentInfo
if rf, ok := ret.Get(0).(func(int64) *SegmentInfo); ok {
r0 = rf(segID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*SegmentInfo)
}
}
return r0
}
// MockCompactionMeta_GetHealthySegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetHealthySegment'
type MockCompactionMeta_GetHealthySegment_Call struct {
*mock.Call
}
// GetHealthySegment is a helper method to define mock.On call
// - segID int64
func (_e *MockCompactionMeta_Expecter) GetHealthySegment(segID interface{}) *MockCompactionMeta_GetHealthySegment_Call {
return &MockCompactionMeta_GetHealthySegment_Call{Call: _e.mock.On("GetHealthySegment", segID)}
}
func (_c *MockCompactionMeta_GetHealthySegment_Call) Run(run func(segID int64)) *MockCompactionMeta_GetHealthySegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockCompactionMeta_GetHealthySegment_Call) Return(_a0 *SegmentInfo) *MockCompactionMeta_GetHealthySegment_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockCompactionMeta_GetHealthySegment_Call) RunAndReturn(run func(int64) *SegmentInfo) *MockCompactionMeta_GetHealthySegment_Call {
_c.Call.Return(run)
return _c
}
// PrepareCompleteCompactionMutation provides a mock function with given fields: plan, result
func (_m *MockCompactionMeta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *SegmentInfo, *segMetricMutation, error) {
ret := _m.Called(plan, result)
var r0 []*SegmentInfo
var r1 *SegmentInfo
var r2 *segMetricMutation
var r3 error
if rf, ok := ret.Get(0).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) ([]*SegmentInfo, *SegmentInfo, *segMetricMutation, error)); ok {
return rf(plan, result)
}
if rf, ok := ret.Get(0).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) []*SegmentInfo); ok {
r0 = rf(plan, result)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*SegmentInfo)
}
}
if rf, ok := ret.Get(1).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) *SegmentInfo); ok {
r1 = rf(plan, result)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*SegmentInfo)
}
}
if rf, ok := ret.Get(2).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) *segMetricMutation); ok {
r2 = rf(plan, result)
} else {
if ret.Get(2) != nil {
r2 = ret.Get(2).(*segMetricMutation)
}
}
if rf, ok := ret.Get(3).(func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) error); ok {
r3 = rf(plan, result)
} else {
r3 = ret.Error(3)
}
return r0, r1, r2, r3
}
// MockCompactionMeta_PrepareCompleteCompactionMutation_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PrepareCompleteCompactionMutation'
type MockCompactionMeta_PrepareCompleteCompactionMutation_Call struct {
*mock.Call
}
// PrepareCompleteCompactionMutation is a helper method to define mock.On call
// - plan *datapb.CompactionPlan
// - result *datapb.CompactionPlanResult
func (_e *MockCompactionMeta_Expecter) PrepareCompleteCompactionMutation(plan interface{}, result interface{}) *MockCompactionMeta_PrepareCompleteCompactionMutation_Call {
return &MockCompactionMeta_PrepareCompleteCompactionMutation_Call{Call: _e.mock.On("PrepareCompleteCompactionMutation", plan, result)}
}
func (_c *MockCompactionMeta_PrepareCompleteCompactionMutation_Call) Run(run func(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult)) *MockCompactionMeta_PrepareCompleteCompactionMutation_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*datapb.CompactionPlan), args[1].(*datapb.CompactionPlanResult))
})
return _c
}
func (_c *MockCompactionMeta_PrepareCompleteCompactionMutation_Call) Return(_a0 []*SegmentInfo, _a1 *SegmentInfo, _a2 *segMetricMutation, _a3 error) *MockCompactionMeta_PrepareCompleteCompactionMutation_Call {
_c.Call.Return(_a0, _a1, _a2, _a3)
return _c
}
func (_c *MockCompactionMeta_PrepareCompleteCompactionMutation_Call) RunAndReturn(run func(*datapb.CompactionPlan, *datapb.CompactionPlanResult) ([]*SegmentInfo, *SegmentInfo, *segMetricMutation, error)) *MockCompactionMeta_PrepareCompleteCompactionMutation_Call {
_c.Call.Return(run)
return _c
}
// SelectSegments provides a mock function with given fields: selector
func (_m *MockCompactionMeta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo {
ret := _m.Called(selector)
var r0 []*SegmentInfo
if rf, ok := ret.Get(0).(func(SegmentInfoSelector) []*SegmentInfo); ok {
r0 = rf(selector)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*SegmentInfo)
}
}
return r0
}
// MockCompactionMeta_SelectSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SelectSegments'
type MockCompactionMeta_SelectSegments_Call struct {
*mock.Call
}
// SelectSegments is a helper method to define mock.On call
// - selector SegmentInfoSelector
func (_e *MockCompactionMeta_Expecter) SelectSegments(selector interface{}) *MockCompactionMeta_SelectSegments_Call {
return &MockCompactionMeta_SelectSegments_Call{Call: _e.mock.On("SelectSegments", selector)}
}
func (_c *MockCompactionMeta_SelectSegments_Call) Run(run func(selector SegmentInfoSelector)) *MockCompactionMeta_SelectSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(SegmentInfoSelector))
})
return _c
}
func (_c *MockCompactionMeta_SelectSegments_Call) Return(_a0 []*SegmentInfo) *MockCompactionMeta_SelectSegments_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockCompactionMeta_SelectSegments_Call) RunAndReturn(run func(SegmentInfoSelector) []*SegmentInfo) *MockCompactionMeta_SelectSegments_Call {
_c.Call.Return(run)
return _c
}
// SetSegmentCompacting provides a mock function with given fields: segmentID, compacting
func (_m *MockCompactionMeta) SetSegmentCompacting(segmentID int64, compacting bool) {
_m.Called(segmentID, compacting)
}
// MockCompactionMeta_SetSegmentCompacting_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetSegmentCompacting'
type MockCompactionMeta_SetSegmentCompacting_Call struct {
*mock.Call
}
// SetSegmentCompacting is a helper method to define mock.On call
// - segmentID int64
// - compacting bool
func (_e *MockCompactionMeta_Expecter) SetSegmentCompacting(segmentID interface{}, compacting interface{}) *MockCompactionMeta_SetSegmentCompacting_Call {
return &MockCompactionMeta_SetSegmentCompacting_Call{Call: _e.mock.On("SetSegmentCompacting", segmentID, compacting)}
}
func (_c *MockCompactionMeta_SetSegmentCompacting_Call) Run(run func(segmentID int64, compacting bool)) *MockCompactionMeta_SetSegmentCompacting_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(bool))
})
return _c
}
func (_c *MockCompactionMeta_SetSegmentCompacting_Call) Return() *MockCompactionMeta_SetSegmentCompacting_Call {
_c.Call.Return()
return _c
}
func (_c *MockCompactionMeta_SetSegmentCompacting_Call) RunAndReturn(run func(int64, bool)) *MockCompactionMeta_SetSegmentCompacting_Call {
_c.Call.Return(run)
return _c
}
// UpdateSegmentsInfo provides a mock function with given fields: operators
func (_m *MockCompactionMeta) UpdateSegmentsInfo(operators ...UpdateOperator) error {
_va := make([]interface{}, len(operators))
for _i := range operators {
_va[_i] = operators[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(...UpdateOperator) error); ok {
r0 = rf(operators...)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockCompactionMeta_UpdateSegmentsInfo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSegmentsInfo'
type MockCompactionMeta_UpdateSegmentsInfo_Call struct {
*mock.Call
}
// UpdateSegmentsInfo is a helper method to define mock.On call
// - operators ...UpdateOperator
func (_e *MockCompactionMeta_Expecter) UpdateSegmentsInfo(operators ...interface{}) *MockCompactionMeta_UpdateSegmentsInfo_Call {
return &MockCompactionMeta_UpdateSegmentsInfo_Call{Call: _e.mock.On("UpdateSegmentsInfo",
append([]interface{}{}, operators...)...)}
}
func (_c *MockCompactionMeta_UpdateSegmentsInfo_Call) Run(run func(operators ...UpdateOperator)) *MockCompactionMeta_UpdateSegmentsInfo_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]UpdateOperator, len(args)-0)
for i, a := range args[0:] {
if a != nil {
variadicArgs[i] = a.(UpdateOperator)
}
}
run(variadicArgs...)
})
return _c
}
func (_c *MockCompactionMeta_UpdateSegmentsInfo_Call) Return(_a0 error) *MockCompactionMeta_UpdateSegmentsInfo_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockCompactionMeta_UpdateSegmentsInfo_Call) RunAndReturn(run func(...UpdateOperator) error) *MockCompactionMeta_UpdateSegmentsInfo_Call {
_c.Call.Return(run)
return _c
}
// alterMetaStoreAfterCompaction provides a mock function with given fields: segmentCompactTo, segmentsCompactFrom
func (_m *MockCompactionMeta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segmentsCompactFrom []*SegmentInfo) error {
ret := _m.Called(segmentCompactTo, segmentsCompactFrom)
var r0 error
if rf, ok := ret.Get(0).(func(*SegmentInfo, []*SegmentInfo) error); ok {
r0 = rf(segmentCompactTo, segmentsCompactFrom)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockCompactionMeta_alterMetaStoreAfterCompaction_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'alterMetaStoreAfterCompaction'
type MockCompactionMeta_alterMetaStoreAfterCompaction_Call struct {
*mock.Call
}
// alterMetaStoreAfterCompaction is a helper method to define mock.On call
// - segmentCompactTo *SegmentInfo
// - segmentsCompactFrom []*SegmentInfo
func (_e *MockCompactionMeta_Expecter) alterMetaStoreAfterCompaction(segmentCompactTo interface{}, segmentsCompactFrom interface{}) *MockCompactionMeta_alterMetaStoreAfterCompaction_Call {
return &MockCompactionMeta_alterMetaStoreAfterCompaction_Call{Call: _e.mock.On("alterMetaStoreAfterCompaction", segmentCompactTo, segmentsCompactFrom)}
}
func (_c *MockCompactionMeta_alterMetaStoreAfterCompaction_Call) Run(run func(segmentCompactTo *SegmentInfo, segmentsCompactFrom []*SegmentInfo)) *MockCompactionMeta_alterMetaStoreAfterCompaction_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*SegmentInfo), args[1].([]*SegmentInfo))
})
return _c
}
func (_c *MockCompactionMeta_alterMetaStoreAfterCompaction_Call) Return(_a0 error) *MockCompactionMeta_alterMetaStoreAfterCompaction_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockCompactionMeta_alterMetaStoreAfterCompaction_Call) RunAndReturn(run func(*SegmentInfo, []*SegmentInfo) error) *MockCompactionMeta_alterMetaStoreAfterCompaction_Call {
_c.Call.Return(run)
return _c
}
// NewMockCompactionMeta creates a new instance of MockCompactionMeta. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockCompactionMeta(t interface {
mock.TestingT
Cleanup(func())
}) *MockCompactionMeta {
mock := &MockCompactionMeta{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,228 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package datacoord
import mock "github.com/stretchr/testify/mock"
// MockScheduler is an autogenerated mock type for the Scheduler type
type MockScheduler struct {
mock.Mock
}
type MockScheduler_Expecter struct {
mock *mock.Mock
}
func (_m *MockScheduler) EXPECT() *MockScheduler_Expecter {
return &MockScheduler_Expecter{mock: &_m.Mock}
}
// Finish provides a mock function with given fields: nodeID, planID
func (_m *MockScheduler) Finish(nodeID int64, planID int64) {
_m.Called(nodeID, planID)
}
// MockScheduler_Finish_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Finish'
type MockScheduler_Finish_Call struct {
*mock.Call
}
// Finish is a helper method to define mock.On call
// - nodeID int64
// - planID int64
func (_e *MockScheduler_Expecter) Finish(nodeID interface{}, planID interface{}) *MockScheduler_Finish_Call {
return &MockScheduler_Finish_Call{Call: _e.mock.On("Finish", nodeID, planID)}
}
func (_c *MockScheduler_Finish_Call) Run(run func(nodeID int64, planID int64)) *MockScheduler_Finish_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64))
})
return _c
}
func (_c *MockScheduler_Finish_Call) Return() *MockScheduler_Finish_Call {
_c.Call.Return()
return _c
}
func (_c *MockScheduler_Finish_Call) RunAndReturn(run func(int64, int64)) *MockScheduler_Finish_Call {
_c.Call.Return(run)
return _c
}
// GetTaskCount provides a mock function with given fields:
func (_m *MockScheduler) GetTaskCount() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// MockScheduler_GetTaskCount_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTaskCount'
type MockScheduler_GetTaskCount_Call struct {
*mock.Call
}
// GetTaskCount is a helper method to define mock.On call
func (_e *MockScheduler_Expecter) GetTaskCount() *MockScheduler_GetTaskCount_Call {
return &MockScheduler_GetTaskCount_Call{Call: _e.mock.On("GetTaskCount")}
}
func (_c *MockScheduler_GetTaskCount_Call) Run(run func()) *MockScheduler_GetTaskCount_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockScheduler_GetTaskCount_Call) Return(_a0 int) *MockScheduler_GetTaskCount_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScheduler_GetTaskCount_Call) RunAndReturn(run func() int) *MockScheduler_GetTaskCount_Call {
_c.Call.Return(run)
return _c
}
// LogStatus provides a mock function with given fields:
func (_m *MockScheduler) LogStatus() {
_m.Called()
}
// MockScheduler_LogStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LogStatus'
type MockScheduler_LogStatus_Call struct {
*mock.Call
}
// LogStatus is a helper method to define mock.On call
func (_e *MockScheduler_Expecter) LogStatus() *MockScheduler_LogStatus_Call {
return &MockScheduler_LogStatus_Call{Call: _e.mock.On("LogStatus")}
}
func (_c *MockScheduler_LogStatus_Call) Run(run func()) *MockScheduler_LogStatus_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockScheduler_LogStatus_Call) Return() *MockScheduler_LogStatus_Call {
_c.Call.Return()
return _c
}
func (_c *MockScheduler_LogStatus_Call) RunAndReturn(run func()) *MockScheduler_LogStatus_Call {
_c.Call.Return(run)
return _c
}
// Schedule provides a mock function with given fields:
func (_m *MockScheduler) Schedule() []*compactionTask {
ret := _m.Called()
var r0 []*compactionTask
if rf, ok := ret.Get(0).(func() []*compactionTask); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*compactionTask)
}
}
return r0
}
// MockScheduler_Schedule_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Schedule'
type MockScheduler_Schedule_Call struct {
*mock.Call
}
// Schedule is a helper method to define mock.On call
func (_e *MockScheduler_Expecter) Schedule() *MockScheduler_Schedule_Call {
return &MockScheduler_Schedule_Call{Call: _e.mock.On("Schedule")}
}
func (_c *MockScheduler_Schedule_Call) Run(run func()) *MockScheduler_Schedule_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockScheduler_Schedule_Call) Return(_a0 []*compactionTask) *MockScheduler_Schedule_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScheduler_Schedule_Call) RunAndReturn(run func() []*compactionTask) *MockScheduler_Schedule_Call {
_c.Call.Return(run)
return _c
}
// Submit provides a mock function with given fields: t
func (_m *MockScheduler) Submit(t ...*compactionTask) {
_va := make([]interface{}, len(t))
for _i := range t {
_va[_i] = t[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
_m.Called(_ca...)
}
// MockScheduler_Submit_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Submit'
type MockScheduler_Submit_Call struct {
*mock.Call
}
// Submit is a helper method to define mock.On call
// - t ...*compactionTask
func (_e *MockScheduler_Expecter) Submit(t ...interface{}) *MockScheduler_Submit_Call {
return &MockScheduler_Submit_Call{Call: _e.mock.On("Submit",
append([]interface{}{}, t...)...)}
}
func (_c *MockScheduler_Submit_Call) Run(run func(t ...*compactionTask)) *MockScheduler_Submit_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]*compactionTask, len(args)-0)
for i, a := range args[0:] {
if a != nil {
variadicArgs[i] = a.(*compactionTask)
}
}
run(variadicArgs...)
})
return _c
}
func (_c *MockScheduler_Submit_Call) Return() *MockScheduler_Submit_Call {
_c.Call.Return()
return _c
}
func (_c *MockScheduler_Submit_Call) RunAndReturn(run func(...*compactionTask)) *MockScheduler_Submit_Call {
_c.Call.Return(run)
return _c
}
// NewMockScheduler creates a new instance of MockScheduler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockScheduler(t interface {
mock.TestingT
Cleanup(func())
}) *MockScheduler {
mock := &MockScheduler{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -122,8 +122,9 @@ type Server struct {
gcOpt GcOption
handler Handler
compactionTrigger trigger
compactionHandler compactionPlanContext
compactionTrigger trigger
compactionHandler compactionPlanContext
compactionViewManager *CompactionViewManager
metricsCacheManager *metricsinfo.MetricsCacheManager
@ -401,6 +402,7 @@ func (s *Server) startDataCoord() {
if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.compactionHandler.start()
s.compactionTrigger.start()
s.compactionViewManager.Start()
}
s.startServerLoop()
s.afterStart()
@ -455,10 +457,13 @@ func (s *Server) SetIndexNodeCreator(f func(context.Context, string, int64) (typ
func (s *Server) createCompactionHandler() {
s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator)
triggerv2 := NewCompactionTriggerManager(s.meta, s.allocator, s.compactionHandler)
s.compactionViewManager = NewCompactionViewManager(s.meta, triggerv2, s.allocator)
}
func (s *Server) stopCompactionHandler() {
s.compactionHandler.stop()
s.compactionViewManager.Close()
}
func (s *Server) createCompactionTrigger() {

View File

@ -513,10 +513,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
s.flushCh <- req.SegmentID
if !req.Importing && Params.DataCoordCfg.EnableCompaction.GetAsBool() {
if segment == nil && req.GetSegLevel() == datapb.SegmentLevel_L0 {
err = s.compactionTrigger.triggerSingleCompaction(req.GetCollectionID(), req.GetPartitionID(),
segmentID, req.GetChannel())
} else {
if req.GetSegLevel() != datapb.SegmentLevel_L0 {
err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(),
segmentID, segment.GetInsertChannel())
}