fix: [2.5]Set isStating to ensuer mutual exclusive between L0 compacting and stats (#39490)

issue: #39333 

master pr: #39489

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/39500/head
cai.zhang 2025-01-22 10:27:05 +08:00 committed by GitHub
parent c1c1f2df4e
commit e46c8ba7fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 480 additions and 74 deletions

View File

@ -60,6 +60,8 @@ type compactionPlanContext interface {
getCompactionTasksNumBySignalID(signalID int64) int
getCompactionInfo(ctx context.Context, signalID int64) *compactionInfo
removeTasksByChannel(channel string)
setTaskScheduler(scheduler *taskScheduler)
checkAndSetSegmentStating(segmentID int64) bool
}
var (
@ -165,6 +167,21 @@ func summaryCompactionState(tasks []*datapb.CompactionTask) *compactionInfo {
return ret
}
func (c *compactionPlanHandler) checkAndSetSegmentStating(segmentID int64) bool {
c.executingGuard.Lock()
defer c.executingGuard.Unlock()
for _, t := range c.executingTasks {
if t.GetTaskProto().GetType() == datapb.CompactionType_Level0DeleteCompaction {
if t.CheckCompactionContainsSegment(segmentID) {
return false
}
}
}
c.meta.SetSegmentStating(segmentID, true)
return true
}
func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) int {
cnt := 0
c.queueTasks.ForEach(func(ct CompactionTask) {
@ -183,25 +200,28 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64)
}
func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, meta CompactionMeta,
allocator allocator.Allocator, analyzeScheduler *taskScheduler, handler Handler,
allocator allocator.Allocator, handler Handler,
) *compactionPlanHandler {
// Higher capacity will have better ordering in priority, but consumes more memory.
// TODO[GOOSE]: Higher capacity makes tasks waiting longer, which need to be get rid of.
capacity := paramtable.Get().DataCoordCfg.CompactionTaskQueueCapacity.GetAsInt()
return &compactionPlanHandler{
queueTasks: *NewCompactionQueue(capacity, getPrioritizer()),
meta: meta,
sessions: sessions,
allocator: allocator,
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
cleaningTasks: make(map[int64]CompactionTask),
analyzeScheduler: analyzeScheduler,
handler: handler,
queueTasks: *NewCompactionQueue(capacity, getPrioritizer()),
meta: meta,
sessions: sessions,
allocator: allocator,
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
cleaningTasks: make(map[int64]CompactionTask),
handler: handler,
}
}
func (c *compactionPlanHandler) setTaskScheduler(scheduler *taskScheduler) {
c.analyzeScheduler = scheduler
}
func (c *compactionPlanHandler) schedule() []CompactionTask {
selected := make([]CompactionTask, 0)
if c.queueTasks.Len() == 0 {
@ -304,6 +324,13 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
}
c.executingGuard.Lock()
// Do not move this check logic outside the lock; it needs to remain mutually exclusive with the stats task.
if t.GetTaskProto().GetType() == datapb.CompactionType_Level0DeleteCompaction {
if !t.PreparePlan() {
c.executingGuard.Unlock()
continue
}
}
c.executingTasks[t.GetTaskProto().GetPlanID()] = t
if len(c.executingTasks) >= parallelism {
c.executingGuard.Unlock()
@ -317,7 +344,6 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
}
func (c *compactionPlanHandler) start() {
c.loadMeta()
c.stopWg.Add(3)
go c.loopSchedule()
go c.loopCheck()

View File

@ -48,6 +48,9 @@ type CompactionTask interface {
GetSpan() trace.Span
SetSpan(trace.Span)
SaveTaskMeta() error
PreparePlan() bool
CheckCompactionContainsSegment(segmentID int64) bool
}
type compactionTaskOpt func(task *datapb.CompactionTask)

View File

@ -181,6 +181,14 @@ func (t *clusteringCompactionTask) Clean() bool {
return t.doClean() == nil
}
func (t *clusteringCompactionTask) PreparePlan() bool {
return true
}
func (t *clusteringCompactionTask) CheckCompactionContainsSegment(segmentID int64) bool {
return false
}
func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {

View File

@ -85,7 +85,7 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() {
s.mockSessionMgr = session.NewMockDataNodeManager(s.T())
scheduler := newTaskScheduler(ctx, s.meta, nil, cm, newIndexEngineVersionManager(), nil, nil)
scheduler := newTaskScheduler(ctx, s.meta, nil, cm, newIndexEngineVersionManager(), nil, nil, nil)
s.analyzeScheduler = scheduler
}

View File

@ -262,6 +262,54 @@ func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compac
return taskClone
}
func (t *l0CompactionTask) selectSealedSegment() ([]int64, []*datapb.CompactionSegmentBinlogs) {
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
// Select sealed L1 segments for LevelZero compaction that meets the condition:
// dmlPos < triggerInfo.pos
sealedSegments := t.meta.SelectSegments(context.TODO(), WithCollection(taskProto.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (taskProto.GetPartitionID() == common.AllPartitionsID || info.GetPartitionID() == taskProto.GetPartitionID()) &&
info.GetInsertChannel() == taskProto.GetChannel() &&
isFlushState(info.GetState()) &&
!info.GetIsImporting() &&
info.GetLevel() != datapb.SegmentLevel_L0 &&
info.GetStartPosition().GetTimestamp() < taskProto.GetPos().GetTimestamp()
}))
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
return &datapb.CompactionSegmentBinlogs{
SegmentID: info.GetID(),
Field2StatslogPaths: info.GetStatslogs(),
InsertChannel: info.GetInsertChannel(),
Level: info.GetLevel(),
CollectionID: info.GetCollectionID(),
PartitionID: info.GetPartitionID(),
IsSorted: info.GetIsSorted(),
}
})
sealedSegmentIDs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) int64 {
return info.GetID()
})
return sealedSegmentIDs, sealedSegBinlogs
}
func (t *l0CompactionTask) CheckCompactionContainsSegment(segmentID int64) bool {
sealedSegmentIDs, _ := t.selectSealedSegment()
for _, sealedSegmentID := range sealedSegmentIDs {
if sealedSegmentID == segmentID {
return true
}
}
return false
}
func (t *l0CompactionTask) PreparePlan() bool {
sealedSegmentIDs, _ := t.selectSealedSegment()
exist, hasStating := t.meta.CheckSegmentsStating(context.TODO(), sealedSegmentIDs)
return exist && !hasStating
}
func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
@ -298,35 +346,13 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
})
}
// Select sealed L1 segments for LevelZero compaction that meets the condition:
// dmlPos < triggerInfo.pos
sealedSegments := t.meta.SelectSegments(context.TODO(), WithCollection(taskProto.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (taskProto.GetPartitionID() == common.AllPartitionsID || info.GetPartitionID() == taskProto.GetPartitionID()) &&
info.GetInsertChannel() == plan.GetChannel() &&
isFlushState(info.GetState()) &&
!info.GetIsImporting() &&
info.GetLevel() != datapb.SegmentLevel_L0 &&
info.GetStartPosition().GetTimestamp() < taskProto.GetPos().GetTimestamp()
}))
if len(sealedSegments) == 0 {
sealedSegmentIDs, sealedSegBinlogs := t.selectSealedSegment()
if len(sealedSegmentIDs) == 0 {
// TODO fast finish l0 segment, just drop l0 segment
log.Info("l0Compaction available non-L0 Segments is empty ")
return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", taskProto.GetPos())
}
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
return &datapb.CompactionSegmentBinlogs{
SegmentID: info.GetID(),
Field2StatslogPaths: info.GetStatslogs(),
InsertChannel: info.GetInsertChannel(),
Level: info.GetLevel(),
CollectionID: info.GetCollectionID(),
PartitionID: info.GetPartitionID(),
IsSorted: info.GetIsSorted(),
}
})
plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...)
log.Info("l0CompactionTask refreshed level zero compaction plan",
zap.Any("target position", taskProto.GetPos()),

View File

@ -316,6 +316,14 @@ func (t *mixCompactionTask) SetSpan(span trace.Span) {
t.span = span
}
func (t *mixCompactionTask) PreparePlan() bool {
return true
}
func (t *mixCompactionTask) CheckCompactionContainsSegment(segmentID int64) bool {
return false
}
func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
beginLogID, _, err := t.allocator.AllocN(1)

View File

@ -60,7 +60,7 @@ func (s *CompactionPlanHandlerSuite) SetupTest() {
s.mockCm = NewMockChannelManager(s.T())
s.mockSessMgr = session.NewMockDataNodeManager(s.T())
s.cluster = NewMockCluster(s.T())
s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil)
s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil)
s.mockHandler = NewNMockHandler(s.T())
s.mockHandler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe()
}
@ -132,6 +132,22 @@ func (s *CompactionPlanHandlerSuite) generateInitTasksForSchedule() {
func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
// dataNode 101's paralleTasks has 1 task running, not L0 task
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything, mock.Anything).Return([]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 3,
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
isStating: false,
},
})
s.mockMeta.EXPECT().CheckSegmentsStating(mock.Anything, mock.Anything).Return(true, false)
tests := []struct {
description string
tasks []CompactionTask
@ -237,6 +253,22 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
}
func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() {
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything, mock.Anything).Return([]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 3,
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
isStating: false,
},
})
s.mockMeta.EXPECT().CheckSegmentsStating(mock.Anything, mock.Anything).Return(true, false)
tests := []struct {
description string
tasks []CompactionTask
@ -313,6 +345,23 @@ func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() {
func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
// dataNode 102's paralleTasks has running L0 tasks
// nothing of the same channel will be able to schedule
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything, mock.Anything).Return([]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 3,
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
isStating: false,
},
})
s.mockMeta.EXPECT().CheckSegmentsStating(mock.Anything, mock.Anything).Return(true, false)
tests := []struct {
description string
tasks []CompactionTask
@ -618,6 +667,22 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
}
return ret
})
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything, mock.Anything).Return([]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 3,
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
isStating: false,
},
})
s.mockMeta.EXPECT().CheckSegmentsStating(mock.Anything, mock.Anything).Return(true, false)
for _, t := range inTasks {
s.handler.submitTask(t)
@ -636,7 +701,7 @@ func (s *CompactionPlanHandlerSuite) TestCompactionQueueFull() {
paramtable.Get().Save("dataCoord.compaction.taskQueueCapacity", "1")
defer paramtable.Get().Reset("dataCoord.compaction.taskQueueCapacity")
s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil)
s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil)
t1 := newMixCompactionTask(&datapb.CompactionTask{
TriggerID: 1,
@ -672,7 +737,7 @@ func (s *CompactionPlanHandlerSuite) TestCompactionQueueFull() {
func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
s.SetupTest()
s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything, mock.Anything).Return(true, true).Maybe()
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil)
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil)
task := &datapb.CompactionTask{
TriggerID: 1,

View File

@ -58,6 +58,9 @@ func (h *spyCompactionHandler) getCompactionInfo(ctx context.Context, signalID i
return nil
}
func (h *spyCompactionHandler) setTaskScheduler(scheduler *taskScheduler) {
}
var _ compactionPlanContext = (*spyCompactionHandler)(nil)
func (h *spyCompactionHandler) removeTasksByChannel(channel string) {}
@ -72,6 +75,10 @@ func (h *spyCompactionHandler) enqueueCompaction(task *datapb.CompactionTask) er
return err
}
func (h *spyCompactionHandler) checkAndSetSegmentStating(segmentID int64) bool {
return false
}
// isFull return true if the task pool is full
func (h *spyCompactionHandler) isFull() bool {
return false

View File

@ -65,6 +65,8 @@ type CompactionMeta interface {
CheckAndSetSegmentsCompacting(ctx context.Context, segmentIDs []int64) (bool, bool)
CompleteCompactionMutation(ctx context.Context, t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)
CleanPartitionStatsInfo(ctx context.Context, info *datapb.PartitionStatsInfo) error
CheckSegmentsStating(ctx context.Context, segmentID []UniqueID) (bool, bool)
SetSegmentStating(segmentID UniqueID, stating bool)
SaveCompactionTask(ctx context.Context, task *datapb.CompactionTask) error
DropCompactionTask(ctx context.Context, task *datapb.CompactionTask) error
@ -1448,6 +1450,31 @@ func (m *meta) SetLastFlushTime(segmentID UniqueID, t time.Time) {
m.segments.SetFlushTime(segmentID, t)
}
func (m *meta) CheckSegmentsStating(ctx context.Context, segmentIDs []UniqueID) (exist bool, hasStating bool) {
m.RLock()
defer m.RUnlock()
exist = true
for _, segmentID := range segmentIDs {
seg := m.segments.GetSegment(segmentID)
if seg != nil {
if seg.isStating {
hasStating = true
}
} else {
exist = false
break
}
}
return exist, hasStating
}
func (m *meta) SetSegmentStating(segmentID UniqueID, stating bool) {
m.Lock()
defer m.Unlock()
m.segments.SetIsCompacting(segmentID, stating)
}
// SetSegmentCompacting sets compaction state for segment
func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
m.Lock()

View File

@ -79,6 +79,63 @@ func (_c *MockCompactionMeta_CheckAndSetSegmentsCompacting_Call) RunAndReturn(ru
return _c
}
// CheckSegmentsStating provides a mock function with given fields: ctx, segmentID
func (_m *MockCompactionMeta) CheckSegmentsStating(ctx context.Context, segmentID []int64) (bool, bool) {
ret := _m.Called(ctx, segmentID)
if len(ret) == 0 {
panic("no return value specified for CheckSegmentsStating")
}
var r0 bool
var r1 bool
if rf, ok := ret.Get(0).(func(context.Context, []int64) (bool, bool)); ok {
return rf(ctx, segmentID)
}
if rf, ok := ret.Get(0).(func(context.Context, []int64) bool); ok {
r0 = rf(ctx, segmentID)
} else {
r0 = ret.Get(0).(bool)
}
if rf, ok := ret.Get(1).(func(context.Context, []int64) bool); ok {
r1 = rf(ctx, segmentID)
} else {
r1 = ret.Get(1).(bool)
}
return r0, r1
}
// MockCompactionMeta_CheckSegmentsStating_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckSegmentsStating'
type MockCompactionMeta_CheckSegmentsStating_Call struct {
*mock.Call
}
// CheckSegmentsStating is a helper method to define mock.On call
// - ctx context.Context
// - segmentID []int64
func (_e *MockCompactionMeta_Expecter) CheckSegmentsStating(ctx interface{}, segmentID interface{}) *MockCompactionMeta_CheckSegmentsStating_Call {
return &MockCompactionMeta_CheckSegmentsStating_Call{Call: _e.mock.On("CheckSegmentsStating", ctx, segmentID)}
}
func (_c *MockCompactionMeta_CheckSegmentsStating_Call) Run(run func(ctx context.Context, segmentID []int64)) *MockCompactionMeta_CheckSegmentsStating_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]int64))
})
return _c
}
func (_c *MockCompactionMeta_CheckSegmentsStating_Call) Return(_a0 bool, _a1 bool) *MockCompactionMeta_CheckSegmentsStating_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockCompactionMeta_CheckSegmentsStating_Call) RunAndReturn(run func(context.Context, []int64) (bool, bool)) *MockCompactionMeta_CheckSegmentsStating_Call {
_c.Call.Return(run)
return _c
}
// CleanPartitionStatsInfo provides a mock function with given fields: ctx, info
func (_m *MockCompactionMeta) CleanPartitionStatsInfo(ctx context.Context, info *datapb.PartitionStatsInfo) error {
ret := _m.Called(ctx, info)
@ -735,6 +792,40 @@ func (_c *MockCompactionMeta_SelectSegments_Call) RunAndReturn(run func(context.
return _c
}
// SetSegmentStating provides a mock function with given fields: segmentID, stating
func (_m *MockCompactionMeta) SetSegmentStating(segmentID int64, stating bool) {
_m.Called(segmentID, stating)
}
// MockCompactionMeta_SetSegmentStating_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetSegmentStating'
type MockCompactionMeta_SetSegmentStating_Call struct {
*mock.Call
}
// SetSegmentStating is a helper method to define mock.On call
// - segmentID int64
// - stating bool
func (_e *MockCompactionMeta_Expecter) SetSegmentStating(segmentID interface{}, stating interface{}) *MockCompactionMeta_SetSegmentStating_Call {
return &MockCompactionMeta_SetSegmentStating_Call{Call: _e.mock.On("SetSegmentStating", segmentID, stating)}
}
func (_c *MockCompactionMeta_SetSegmentStating_Call) Run(run func(segmentID int64, stating bool)) *MockCompactionMeta_SetSegmentStating_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(bool))
})
return _c
}
func (_c *MockCompactionMeta_SetSegmentStating_Call) Return() *MockCompactionMeta_SetSegmentStating_Call {
_c.Call.Return()
return _c
}
func (_c *MockCompactionMeta_SetSegmentStating_Call) RunAndReturn(run func(int64, bool)) *MockCompactionMeta_SetSegmentStating_Call {
_c.Call.Return(run)
return _c
}
// SetSegmentsCompacting provides a mock function with given fields: ctx, segmentID, compacting
func (_m *MockCompactionMeta) SetSegmentsCompacting(ctx context.Context, segmentID []int64, compacting bool) {
_m.Called(ctx, segmentID, compacting)

View File

@ -22,6 +22,52 @@ func (_m *MockCompactionPlanContext) EXPECT() *MockCompactionPlanContext_Expecte
return &MockCompactionPlanContext_Expecter{mock: &_m.Mock}
}
// checkAndSetSegmentStating provides a mock function with given fields: segmentID
func (_m *MockCompactionPlanContext) checkAndSetSegmentStating(segmentID int64) bool {
ret := _m.Called(segmentID)
if len(ret) == 0 {
panic("no return value specified for checkAndSetSegmentStating")
}
var r0 bool
if rf, ok := ret.Get(0).(func(int64) bool); ok {
r0 = rf(segmentID)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockCompactionPlanContext_checkAndSetSegmentStating_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'checkAndSetSegmentStating'
type MockCompactionPlanContext_checkAndSetSegmentStating_Call struct {
*mock.Call
}
// checkAndSetSegmentStating is a helper method to define mock.On call
// - segmentID int64
func (_e *MockCompactionPlanContext_Expecter) checkAndSetSegmentStating(segmentID interface{}) *MockCompactionPlanContext_checkAndSetSegmentStating_Call {
return &MockCompactionPlanContext_checkAndSetSegmentStating_Call{Call: _e.mock.On("checkAndSetSegmentStating", segmentID)}
}
func (_c *MockCompactionPlanContext_checkAndSetSegmentStating_Call) Run(run func(segmentID int64)) *MockCompactionPlanContext_checkAndSetSegmentStating_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockCompactionPlanContext_checkAndSetSegmentStating_Call) Return(_a0 bool) *MockCompactionPlanContext_checkAndSetSegmentStating_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockCompactionPlanContext_checkAndSetSegmentStating_Call) RunAndReturn(run func(int64) bool) *MockCompactionPlanContext_checkAndSetSegmentStating_Call {
_c.Call.Return(run)
return _c
}
// enqueueCompaction provides a mock function with given fields: task
func (_m *MockCompactionPlanContext) enqueueCompaction(task *datapb.CompactionTask) error {
ret := _m.Called(task)
@ -241,6 +287,39 @@ func (_c *MockCompactionPlanContext_removeTasksByChannel_Call) RunAndReturn(run
return _c
}
// setTaskScheduler provides a mock function with given fields: scheduler
func (_m *MockCompactionPlanContext) setTaskScheduler(scheduler *taskScheduler) {
_m.Called(scheduler)
}
// MockCompactionPlanContext_setTaskScheduler_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'setTaskScheduler'
type MockCompactionPlanContext_setTaskScheduler_Call struct {
*mock.Call
}
// setTaskScheduler is a helper method to define mock.On call
// - scheduler *taskScheduler
func (_e *MockCompactionPlanContext_Expecter) setTaskScheduler(scheduler interface{}) *MockCompactionPlanContext_setTaskScheduler_Call {
return &MockCompactionPlanContext_setTaskScheduler_Call{Call: _e.mock.On("setTaskScheduler", scheduler)}
}
func (_c *MockCompactionPlanContext_setTaskScheduler_Call) Run(run func(scheduler *taskScheduler)) *MockCompactionPlanContext_setTaskScheduler_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*taskScheduler))
})
return _c
}
func (_c *MockCompactionPlanContext_setTaskScheduler_Call) Return() *MockCompactionPlanContext_setTaskScheduler_Call {
_c.Call.Return()
return _c
}
func (_c *MockCompactionPlanContext_setTaskScheduler_Call) RunAndReturn(run func(*taskScheduler)) *MockCompactionPlanContext_setTaskScheduler_Call {
_c.Call.Return(run)
return _c
}
// start provides a mock function with given fields:
func (_m *MockCompactionPlanContext) start() {
_m.Called()

View File

@ -56,6 +56,9 @@ type SegmentInfo struct {
size atomic.Int64
deltaRowcount atomic.Int64
lastWrittenTime time.Time
// It is only to ensure mutual exclusion between L0 compacting and stats tasks
isStating bool
}
// NewSegmentInfo create `SegmentInfo` wrapper from `datapb.SegmentInfo`
@ -278,6 +281,13 @@ func (s *SegmentsInfo) SetIsCompacting(segmentID UniqueID, isCompacting bool) {
}
}
// SetIsStating sets stating status for segment
func (s *SegmentsInfo) SetIsStating(segmentID UniqueID, isStating bool) {
if segment, ok := s.segments[segmentID]; ok {
s.segments[segmentID] = segment.ShadowClone(SetIsStating(isStating))
}
}
func (s *SegmentInfo) IsDeltaLogExists(logID int64) bool {
for _, deltaLogs := range s.GetDeltalogs() {
for _, l := range deltaLogs.GetBinlogs() {
@ -465,6 +475,13 @@ func SetIsCompacting(isCompacting bool) SegmentInfoOption {
}
}
// SetIsStating is the option to set stats state for segment info
func SetIsStating(isStating bool) SegmentInfoOption {
return func(segment *SegmentInfo) {
segment.isStating = isStating
}
}
// SetLevel is the option to set level for segment info
func SetLevel(level datapb.SegmentLevel) SegmentInfoOption {
return func(segment *SegmentInfo) {

View File

@ -382,15 +382,15 @@ func (s *Server) initDataCoord() error {
}
log.Info("init service discovery done")
s.initCompaction()
log.Info("init compaction done")
s.initTaskScheduler(storageCli)
log.Info("init task scheduler done")
s.initJobManager()
log.Info("init statsJobManager done")
s.initCompaction()
log.Info("init compaction done")
if err = s.initSegmentManager(); err != nil {
return err
}
@ -685,7 +685,8 @@ func (s *Server) initMeta(chunkManager storage.ChunkManager) error {
func (s *Server) initTaskScheduler(manager storage.ChunkManager) {
if s.taskScheduler == nil {
s.taskScheduler = newTaskScheduler(s.ctx, s.meta, s.indexNodeManager, manager, s.indexEngineVersionManager, s.handler, s.allocator)
s.taskScheduler = newTaskScheduler(s.ctx, s.meta, s.indexNodeManager, manager, s.indexEngineVersionManager, s.handler, s.allocator, s.compactionHandler)
s.compactionHandler.setTaskScheduler(s.taskScheduler)
}
}
@ -702,7 +703,9 @@ func (s *Server) initIndexNodeManager() {
}
func (s *Server) initCompaction() {
s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.meta, s.allocator, s.taskScheduler, s.handler)
cph := newCompactionPlanHandler(s.cluster, s.sessionManager, s.meta, s.allocator, s.handler)
cph.loadMeta()
s.compactionHandler = cph
s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta)
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager)
}

View File

@ -1706,7 +1706,7 @@ func TestGetCompactionState(t *testing.T) {
{State: datapb.CompactionTaskState_timeout},
{State: datapb.CompactionTaskState_timeout},
})
mockHandler := newCompactionPlanHandler(nil, nil, mockMeta, nil, nil, nil)
mockHandler := newCompactionPlanHandler(nil, nil, mockMeta, nil, nil)
svr.compactionHandler = mockHandler
resp, err := svr.GetCompactionState(context.Background(), &milvuspb.GetCompactionStateRequest{CompactionID: 1})
assert.NoError(t, err)

View File

@ -118,7 +118,7 @@ func (at *analyzeTask) GetFailReason() string {
return at.taskInfo.GetFailReason()
}
func (at *analyzeTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error {
func (at *analyzeTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta, compactionHandler compactionPlanContext) error {
if err := meta.analyzeMeta.UpdateVersion(at.GetTaskID(), nodeID); err != nil {
return err
}

View File

@ -120,7 +120,7 @@ func (it *indexBuildTask) GetFailReason() string {
return it.taskInfo.FailReason
}
func (it *indexBuildTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error {
func (it *indexBuildTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta, compactionHandler compactionPlanContext) error {
if err := meta.indexMeta.UpdateVersion(it.taskID, nodeID); err != nil {
return err
}

View File

@ -62,6 +62,7 @@ type taskScheduler struct {
indexEngineVersionManager IndexEngineVersionManager
handler Handler
allocator allocator.Allocator
compactionHandler compactionPlanContext
taskStats *expirable.LRU[UniqueID, Task]
}
@ -73,6 +74,7 @@ func newTaskScheduler(
indexEngineVersionManager IndexEngineVersionManager,
handler Handler,
allocator allocator.Allocator,
compactionHandler compactionPlanContext,
) *taskScheduler {
ctx, cancel := context.WithCancel(ctx)
@ -92,6 +94,7 @@ func newTaskScheduler(
indexEngineVersionManager: indexEngineVersionManager,
allocator: allocator,
taskStats: expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*15),
compactionHandler: compactionHandler,
}
ts.reloadFromMeta()
return ts
@ -154,17 +157,30 @@ func (s *taskScheduler) reloadFromMeta() {
for taskID, t := range allStatsTasks {
if t.GetState() != indexpb.JobState_JobStateFinished && t.GetState() != indexpb.JobState_JobStateFailed {
if t.GetState() == indexpb.JobState_JobStateInProgress || t.GetState() == indexpb.JobState_JobStateRetry {
exist, canDo := s.meta.CheckAndSetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()})
if !exist || !canDo {
log.Ctx(s.ctx).Warn("segment is not exist or is compacting, skip stats, but this should not have happened, try to remove the stats task",
zap.Int64("taskID", taskID), zap.Bool("exist", exist), zap.Bool("canDo", canDo))
err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID())
if err == nil {
continue
if t.GetState() == indexpb.JobState_JobStateInProgress || t.GetState() == indexpb.JobState_JobStateRetry {
exist, canDo := s.meta.CheckAndSetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()})
if !exist || !canDo {
log.Ctx(s.ctx).Warn("segment is not exist or is compacting, skip stats, but this should not have happened, try to remove the stats task",
zap.Int64("taskID", taskID), zap.Bool("exist", exist), zap.Bool("canDo", canDo))
err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID())
if err == nil {
continue
}
log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err))
t.State = indexpb.JobState_JobStateFailed
t.FailReason = "segment is not exist or is compacting"
} else {
if !s.compactionHandler.checkAndSetSegmentStating(t.GetSegmentID()) {
s.meta.SetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()}, false)
err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID())
if err == nil {
continue
}
log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err))
t.State = indexpb.JobState_JobStateFailed
t.FailReason = "segment is not exist or is l0 compacting"
}
}
log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err))
t.State = indexpb.JobState_JobStateFailed
t.FailReason = "segment is nto exist or is compacting"
}
}
s.enqueue(&statsTask{
@ -403,7 +419,7 @@ func (s *taskScheduler) processInit(task Task) bool {
log.Ctx(s.ctx).Info("pick client success", zap.Int64("taskID", task.GetTaskID()), zap.Int64("nodeID", nodeID))
// 2. update version
if err := task.UpdateVersion(s.ctx, nodeID, s.meta); err != nil {
if err := task.UpdateVersion(s.ctx, nodeID, s.meta, s.compactionHandler); err != nil {
log.Ctx(s.ctx).Warn("update task version failed", zap.Int64("taskID", task.GetTaskID()), zap.Error(err))
return false
}

View File

@ -853,7 +853,7 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) {
cm := mocks.NewChunkManager(s.T())
cm.EXPECT().RootPath().Return("root")
scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil)
scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil, nil)
s.Equal(9, len(scheduler.tasks))
s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[1].GetState())
s.Equal(indexpb.JobState_JobStateInProgress, scheduler.tasks[2].GetState())
@ -1000,7 +1000,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
}))
handler := NewNMockHandler(s.T())
scheduler := newTaskScheduler(ctx, mt, workerManager, nil, nil, handler, nil)
scheduler := newTaskScheduler(ctx, mt, workerManager, nil, nil, handler, nil, nil)
mt.segments.DropSegment(1000)
scheduler.scheduleDuration = s.duration
@ -1060,7 +1060,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
},
}, nil)
scheduler := newTaskScheduler(ctx, mt, workerManager, nil, nil, handler, nil)
scheduler := newTaskScheduler(ctx, mt, workerManager, nil, nil, handler, nil, nil)
// remove task in meta
err := scheduler.meta.analyzeMeta.DropAnalyzeTask(context.TODO(), 1)
@ -1341,7 +1341,7 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() {
cm.EXPECT().RootPath().Return("ut-index")
handler := NewNMockHandler(s.T())
scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil)
scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil, nil)
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("True")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("False")
@ -1615,7 +1615,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
scheduler := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil)
scheduler := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil, nil)
waitTaskDoneFunc := func(sche *taskScheduler) {
for {
@ -1854,7 +1854,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
handler_isolation := NewNMockHandler(s.T())
handler_isolation.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(isoCollInfo, nil)
scheduler_isolation := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler_isolation, nil)
scheduler_isolation := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler_isolation, nil, nil)
scheduler_isolation.Start()
s.Run("Submit partitionKeyIsolation is false when MV not enabled", func() {
@ -1952,7 +1952,9 @@ func (s *taskSchedulerSuite) Test_reload() {
},
},
}))
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil)
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything).Return(true).Maybe()
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
@ -1986,8 +1988,10 @@ func (s *taskSchedulerSuite) Test_reload() {
},
},
}))
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything).Return(true).Maybe()
mt.segments.segments[1000].isCompacting = true
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil)
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
@ -2021,8 +2025,10 @@ func (s *taskSchedulerSuite) Test_reload() {
},
},
}))
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything).Return(true).Maybe()
mt.segments.segments[1000].isCompacting = true
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil)
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]

View File

@ -79,6 +79,7 @@ func (st *statsTask) ResetTask(mt *meta) {
// reset isCompacting
mt.SetSegmentsCompacting(context.TODO(), []UniqueID{st.segmentID}, false)
mt.SetSegmentStating(st.segmentID, false)
}
func (st *statsTask) SetQueueTime(t time.Time) {
@ -127,15 +128,26 @@ func (st *statsTask) GetFailReason() string {
return st.taskInfo.GetFailReason()
}
func (st *statsTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error {
func (st *statsTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta, compactionHandler compactionPlanContext) error {
// mark compacting
if exist, canDo := meta.CheckAndSetSegmentsCompacting(ctx, []UniqueID{st.segmentID}); !exist || !canDo {
log.Warn("segment is not exist or is compacting, skip stats",
zap.Bool("exist", exist), zap.Bool("canDo", canDo))
st.SetState(indexpb.JobState_JobStateNone, "segment is not healthy")
st.SetState(indexpb.JobState_JobStateFailed, "segment is not healthy")
st.SetStartTime(time.Now())
return fmt.Errorf("mark segment compacting failed, isCompacting: %v", !canDo)
}
if !compactionHandler.checkAndSetSegmentStating(st.segmentID) {
log.Warn("segment is contains by l0 compaction, skip stats", zap.Int64("taskID", st.taskID),
zap.Int64("segmentID", st.segmentID))
st.SetState(indexpb.JobState_JobStateFailed, "segment is contains by l0 compaction")
//reset compacting
meta.SetSegmentsCompacting(ctx, []UniqueID{st.segmentID}, false)
st.SetStartTime(time.Now())
return fmt.Errorf("segment is contains by l0 compaction")
}
if err := meta.statsTaskMeta.UpdateVersion(st.taskID, nodeID); err != nil {
return err
}

View File

@ -166,21 +166,33 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() {
s.Run("segment is compacting", func() {
s.mt.segments.segments[s.segID].isCompacting = true
s.Error(st.UpdateVersion(context.Background(), 1, s.mt))
s.Error(st.UpdateVersion(context.Background(), 1, s.mt, nil))
})
s.Run("segment is in l0 compaction", func() {
s.mt.segments.segments[s.segID].isCompacting = false
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything).Return(false)
s.Error(st.UpdateVersion(context.Background(), 1, s.mt, compactionHandler))
s.False(s.mt.segments.segments[s.segID].isCompacting)
})
s.Run("normal case", func() {
s.mt.segments.segments[s.segID].isCompacting = false
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything).Return(true)
catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil).Once()
s.NoError(st.UpdateVersion(context.Background(), 1, s.mt))
s.NoError(st.UpdateVersion(context.Background(), 1, s.mt, compactionHandler))
})
s.Run("failed case", func() {
s.mt.segments.segments[s.segID].isCompacting = false
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything).Return(true)
catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(fmt.Errorf("error")).Once()
s.Error(st.UpdateVersion(context.Background(), 1, s.mt))
s.Error(st.UpdateVersion(context.Background(), 1, s.mt, compactionHandler))
})
})

View File

@ -33,7 +33,7 @@ type Task interface {
SetState(state indexpb.JobState, failReason string)
GetState() indexpb.JobState
GetFailReason() string
UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error
UpdateVersion(ctx context.Context, nodeID int64, meta *meta, compactionHandler compactionPlanContext) error
UpdateMetaBuildingState(meta *meta) error
AssignTask(ctx context.Context, client types.IndexNodeClient) bool
QueryResult(ctx context.Context, client types.IndexNodeClient)