fix: Separate schedule and check results loop (#28692)

This PR:

- Separates compaction scheduler and check results loop So that slow in
check-loop doesn't influence execution.

- Cleans compaction tasks when drop a vchannel so dropped-channel's
compaction tasks won't be checked over and over again.

  - Skips meta change when meta's already changed, avoid panic
  - Remove not inuse injectDone(bool) parameter

See also: #28628, #28209

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/28878/head
XuanYang-cn 2023-11-29 10:50:29 +08:00 committed by GitHub
parent 845851ea1c
commit 321c5c32e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 173 additions and 193 deletions

View File

@ -52,6 +52,7 @@ type compactionPlanContext interface {
isFull() bool
// get compaction tasks by signal id
getCompactionTasksBySignalID(signalID int64) []*compactionTask
removeTasksByChannel(channel string)
}
type compactionTaskState int8
@ -93,15 +94,18 @@ func (t *compactionTask) shadowClone(opts ...compactionTaskOpt) *compactionTask
var _ compactionPlanContext = (*compactionPlanHandler)(nil)
type compactionPlanHandler struct {
plans map[int64]*compactionTask // planID -> task
sessions *SessionManager
mu sync.RWMutex
plans map[int64]*compactionTask // planID -> task
meta *meta
chManager *ChannelManager
mu sync.RWMutex
allocator allocator
quit chan struct{}
wg sync.WaitGroup
chManager *ChannelManager
sessions *SessionManager
scheduler *CompactionScheduler
stopCh chan struct{}
stopOnce sync.Once
stopWg sync.WaitGroup
}
func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta *meta, allocator allocator,
@ -118,20 +122,18 @@ func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta
func (c *compactionPlanHandler) start() {
interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second)
c.quit = make(chan struct{})
c.wg.Add(1)
c.stopCh = make(chan struct{})
c.stopWg.Add(2)
go func() {
defer c.wg.Done()
defer c.stopWg.Done()
checkResultTicker := time.NewTicker(interval)
scheduleTicker := time.NewTicker(200 * time.Millisecond)
log.Info("compaction handler start", zap.Any("check result interval", interval))
log.Info("Compaction handler check result loop start", zap.Any("check result interval", interval))
defer checkResultTicker.Stop()
defer scheduleTicker.Stop()
for {
select {
case <-c.quit:
log.Info("compaction handler quit")
case <-c.stopCh:
log.Info("compaction handler check result loop quit")
return
case <-checkResultTicker.C:
// deal results
@ -144,6 +146,22 @@ func (c *compactionPlanHandler) start() {
}
cancel()
_ = c.updateCompaction(ts)
}
}
}()
// saperate check results and schedule goroutine so that check results doesn't
// influence the schedule
go func() {
defer c.stopWg.Done()
scheduleTicker := time.NewTicker(200 * time.Millisecond)
defer scheduleTicker.Stop()
log.Info("compaction handler start schedule")
for {
select {
case <-c.stopCh:
log.Info("Compaction handler quit schedule")
return
case <-scheduleTicker.C:
// schedule queuing tasks
@ -159,8 +177,21 @@ func (c *compactionPlanHandler) start() {
}
func (c *compactionPlanHandler) stop() {
close(c.quit)
c.wg.Wait()
c.stopOnce.Do(func() {
close(c.stopCh)
})
c.stopWg.Wait()
}
func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
c.mu.Lock()
defer c.mu.Unlock()
for id, task := range c.plans {
if task.triggerInfo.channel == channel {
c.scheduler.finish(task.dataNodeID, task.plan.PlanID)
delete(c.plans, id)
}
}
}
func (c *compactionPlanHandler) updateTask(planID int64, opts ...compactionTaskOpt) {
@ -298,38 +329,53 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionPlan
}
func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) error {
// Also prepare metric updates.
_, modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan, result)
if err != nil {
return err
}
log := log.With(zap.Int64("planID", plan.GetPlanID()))
if len(result.GetSegments()) == 0 || len(result.GetSegments()) > 1 {
// should never happen
log.Warn("illegal compaction results")
return fmt.Errorf("Illegal compaction results: %v", result)
}
if err := c.meta.alterMetaStoreAfterCompaction(newSegment, modSegments); err != nil {
log.Warn("fail to alert meta store", zap.Error(err))
return err
// Merge compaction has one and only one segment
newSegmentInfo := c.meta.GetHealthySegment(result.GetSegments()[0].SegmentID)
if newSegmentInfo != nil {
log.Info("meta has already been changed, skip meta change and retry sync segments")
} else {
// Also prepare metric updates.
_, modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan, result)
if err != nil {
return err
}
if err := c.meta.alterMetaStoreAfterCompaction(newSegment, modSegments); err != nil {
log.Warn("fail to alert meta store", zap.Error(err))
return err
}
// Apply metrics after successful meta update.
metricMutation.commit()
newSegmentInfo = newSegment
}
nodeID := c.plans[plan.GetPlanID()].dataNodeID
req := &datapb.SyncSegmentsRequest{
PlanID: plan.PlanID,
CompactedTo: newSegment.GetID(),
CompactedFrom: newSegment.GetCompactionFrom(),
NumOfRows: newSegment.GetNumOfRows(),
StatsLogs: newSegment.GetStatslogs(),
CompactedTo: newSegmentInfo.GetID(),
CompactedFrom: newSegmentInfo.GetCompactionFrom(),
NumOfRows: newSegmentInfo.GetNumOfRows(),
StatsLogs: newSegmentInfo.GetStatslogs(),
ChannelName: plan.GetChannel(),
PartitionId: newSegment.GetPartitionID(),
CollectionId: newSegment.GetCollectionID(),
PartitionId: newSegmentInfo.GetPartitionID(),
CollectionId: newSegmentInfo.GetCollectionID(),
}
log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID))
if err := c.sessions.SyncSegments(nodeID, req); err != nil {
log.Warn("handleCompactionResult: fail to sync segments with node, reverting metastore",
log.Warn("handleCompactionResult: fail to sync segments with node",
zap.Int64("nodeID", nodeID), zap.Error(err))
return err
}
// Apply metrics after successful meta update.
metricMutation.commit()
log.Info("handleCompactionResult: success to handle merge compaction result")
return nil

View File

@ -24,7 +24,7 @@ type Scheduler interface {
type CompactionScheduler struct {
taskNumber *atomic.Int32
queuingTasks []*compactionTask
parallelTasks map[int64][]*compactionTask
parallelTasks map[int64][]*compactionTask // parallel by nodeID
mu sync.RWMutex
planHandler *compactionPlanHandler

View File

@ -38,6 +38,10 @@ type spyCompactionHandler struct {
spyChan chan *datapb.CompactionPlan
}
var _ compactionPlanContext = (*spyCompactionHandler)(nil)
func (h *spyCompactionHandler) removeTasksByChannel(channel string) {}
// execCompactionPlan start to execute plan and return immediately
func (h *spyCompactionHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
h.spyChan <- plan

View File

@ -192,6 +192,39 @@ func (_c *MockCompactionPlanContext_isFull_Call) RunAndReturn(run func() bool) *
return _c
}
// removeTasksByChannel provides a mock function with given fields: channel
func (_m *MockCompactionPlanContext) removeTasksByChannel(channel string) {
_m.Called(channel)
}
// MockCompactionPlanContext_removeTasksByChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'removeTasksByChannel'
type MockCompactionPlanContext_removeTasksByChannel_Call struct {
*mock.Call
}
// removeTasksByChannel is a helper method to define mock.On call
// - channel string
func (_e *MockCompactionPlanContext_Expecter) removeTasksByChannel(channel interface{}) *MockCompactionPlanContext_removeTasksByChannel_Call {
return &MockCompactionPlanContext_removeTasksByChannel_Call{Call: _e.mock.On("removeTasksByChannel", channel)}
}
func (_c *MockCompactionPlanContext_removeTasksByChannel_Call) Run(run func(channel string)) *MockCompactionPlanContext_removeTasksByChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockCompactionPlanContext_removeTasksByChannel_Call) Return() *MockCompactionPlanContext_removeTasksByChannel_Call {
_c.Call.Return()
return _c
}
func (_c *MockCompactionPlanContext_removeTasksByChannel_Call) RunAndReturn(run func(string)) *MockCompactionPlanContext_removeTasksByChannel_Call {
_c.Call.Return(run)
return _c
}
// start provides a mock function with given fields:
func (_m *MockCompactionPlanContext) start() {
_m.Called()

View File

@ -572,90 +572,6 @@ func (m *mockRootCoordClient) ReportImport(ctx context.Context, req *rootcoordpb
return merr.Success(), nil
}
type mockCompactionHandler struct {
methods map[string]interface{}
}
func (h *mockCompactionHandler) start() {
if f, ok := h.methods["start"]; ok {
if ff, ok := f.(func()); ok {
ff()
return
}
}
panic("not implemented")
}
func (h *mockCompactionHandler) stop() {
if f, ok := h.methods["stop"]; ok {
if ff, ok := f.(func()); ok {
ff()
return
}
}
panic("not implemented")
}
// execCompactionPlan start to execute plan and return immediately
func (h *mockCompactionHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
if f, ok := h.methods["execCompactionPlan"]; ok {
if ff, ok := f.(func(signal *compactionSignal, plan *datapb.CompactionPlan) error); ok {
return ff(signal, plan)
}
}
panic("not implemented")
}
// // completeCompaction record the result of a compaction
// func (h *mockCompactionHandler) completeCompaction(result *datapb.CompactionResult) error {
// if f, ok := h.methods["completeCompaction"]; ok {
// if ff, ok := f.(func(result *datapb.CompactionResult) error); ok {
// return ff(result)
// }
// }
// panic("not implemented")
// }
// getCompaction return compaction task. If planId does not exist, return nil.
func (h *mockCompactionHandler) getCompaction(planID int64) *compactionTask {
if f, ok := h.methods["getCompaction"]; ok {
if ff, ok := f.(func(planID int64) *compactionTask); ok {
return ff(planID)
}
}
panic("not implemented")
}
// expireCompaction set the compaction state to expired
func (h *mockCompactionHandler) updateCompaction(ts Timestamp) error {
if f, ok := h.methods["expireCompaction"]; ok {
if ff, ok := f.(func(ts Timestamp) error); ok {
return ff(ts)
}
}
panic("not implemented")
}
// isFull return true if the task pool is full
func (h *mockCompactionHandler) isFull() bool {
if f, ok := h.methods["isFull"]; ok {
if ff, ok := f.(func() bool); ok {
return ff()
}
}
panic("not implemented")
}
// get compaction tasks by signal id
func (h *mockCompactionHandler) getCompactionTasksBySignalID(signalID int64) []*compactionTask {
if f, ok := h.methods["getCompactionTasksBySignalID"]; ok {
if ff, ok := f.(func(signalID int64) []*compactionTask); ok {
return ff(signalID)
}
}
panic("not implemented")
}
type mockCompactionTrigger struct {
methods map[string]interface{}
}

View File

@ -3063,15 +3063,10 @@ func TestGetCompactionState(t *testing.T) {
svr := &Server{}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.compactionHandler = &mockCompactionHandler{
methods: map[string]interface{}{
"getCompactionTasksBySignalID": func(signalID int64) []*compactionTask {
return []*compactionTask{
{state: completed},
}
},
},
}
mockHandler := NewMockCompactionPlanContext(t)
mockHandler.EXPECT().getCompactionTasksBySignalID(mock.Anything).Return(
[]*compactionTask{{state: completed}})
svr.compactionHandler = mockHandler
resp, err := svr.GetCompactionState(context.Background(), &milvuspb.GetCompactionStateRequest{})
assert.NoError(t, err)
@ -3082,24 +3077,21 @@ func TestGetCompactionState(t *testing.T) {
svr := &Server{}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.compactionHandler = &mockCompactionHandler{
methods: map[string]interface{}{
"getCompactionTasksBySignalID": func(signalID int64) []*compactionTask {
return []*compactionTask{
{state: executing},
{state: executing},
{state: executing},
{state: completed},
{state: completed},
{state: failed, plan: &datapb.CompactionPlan{PlanID: 1}},
{state: timeout, plan: &datapb.CompactionPlan{PlanID: 2}},
{state: timeout},
{state: timeout},
{state: timeout},
}
},
},
}
mockHandler := NewMockCompactionPlanContext(t)
mockHandler.EXPECT().getCompactionTasksBySignalID(mock.Anything).Return(
[]*compactionTask{
{state: executing},
{state: executing},
{state: executing},
{state: completed},
{state: completed},
{state: failed, plan: &datapb.CompactionPlan{PlanID: 1}},
{state: timeout, plan: &datapb.CompactionPlan{PlanID: 2}},
{state: timeout},
{state: timeout},
{state: timeout},
})
svr.compactionHandler = mockHandler
resp, err := svr.GetCompactionState(context.Background(), &milvuspb.GetCompactionStateRequest{CompactionID: 1})
assert.NoError(t, err)
@ -3187,18 +3179,15 @@ func TestGetCompactionStateWithPlans(t *testing.T) {
svr := &Server{}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.compactionHandler = &mockCompactionHandler{
methods: map[string]interface{}{
"getCompactionTasksBySignalID": func(signalID int64) []*compactionTask {
return []*compactionTask{
{
triggerInfo: &compactionSignal{id: 1},
state: executing,
},
}
mockHandler := NewMockCompactionPlanContext(t)
mockHandler.EXPECT().getCompactionTasksBySignalID(mock.Anything).Return(
[]*compactionTask{
{
triggerInfo: &compactionSignal{id: 1},
state: executing,
},
},
}
})
svr.compactionHandler = mockHandler
resp, err := svr.GetCompactionStateWithPlans(context.TODO(), &milvuspb.GetCompactionPlansRequest{
CompactionID: 1,
@ -3211,19 +3200,6 @@ func TestGetCompactionStateWithPlans(t *testing.T) {
t.Run("test get compaction state with closed server", func(t *testing.T) {
svr := &Server{}
svr.stateCode.Store(commonpb.StateCode_Abnormal)
svr.compactionHandler = &mockCompactionHandler{
methods: map[string]interface{}{
"getCompactionTasksBySignalID": func(signalID int64) []*compactionTask {
return []*compactionTask{
{
triggerInfo: &compactionSignal{id: 1},
state: executing,
},
}
},
},
}
resp, err := svr.GetCompactionStateWithPlans(context.TODO(), &milvuspb.GetCompactionPlansRequest{
CompactionID: 1,
})

View File

@ -589,6 +589,7 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
log.Warn("DropVChannel failed to ReleaseAndRemove", zap.String("channel", channel), zap.Error(err))
}
s.segmentManager.DropSegmentsOfChannel(ctx, channel)
s.compactionHandler.removeTasksByChannel(channel)
metrics.CleanupDataCoordNumStoredRows(collectionID)
metrics.DataCoordCheckpointLag.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), channel)

View File

@ -63,11 +63,11 @@ func (c *compactionExecutor) toCompleteState(task compactor) {
c.executing.GetAndRemove(task.getPlanID())
}
func (c *compactionExecutor) injectDone(planID UniqueID, success bool) {
func (c *compactionExecutor) injectDone(planID UniqueID) {
c.completed.GetAndRemove(planID)
task, loaded := c.completedCompactor.GetAndRemove(planID)
if loaded {
task.injectDone(success)
task.injectDone()
}
}
@ -116,34 +116,37 @@ func (c *compactionExecutor) stopTask(planID UniqueID) {
}
}
func (c *compactionExecutor) channelValidateForCompaction(vChannelName string) bool {
func (c *compactionExecutor) isValidChannel(channel string) bool {
// if vchannel marked dropped, compaction should not proceed
return !c.dropped.Contain(vChannelName)
return !c.dropped.Contain(channel)
}
func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string) {
c.dropped.Insert(vChannelName)
func (c *compactionExecutor) clearTasksByChannel(channel string) {
c.dropped.Insert(channel)
// stop executing tasks of channel
c.executing.Range(func(planID int64, task compactor) bool {
if task.getChannelName() == vChannelName {
if task.getChannelName() == channel {
c.stopTask(planID)
}
return true
})
// remove all completed plans for vChannelName
// remove all completed plans of channel
c.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool {
if result.GetChannel() == vChannelName {
c.injectDone(planID, true)
if result.GetChannel() == channel {
c.injectDone(planID)
log.Info("remove compaction results for dropped channel",
zap.String("channel", vChannelName),
zap.String("channel", channel),
zap.Int64("planID", planID))
}
return true
})
}
func (c *compactionExecutor) getAllCompactionPlanResult() []*datapb.CompactionPlanResult {
func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionPlanResult {
results := make([]*datapb.CompactionPlanResult, 0)
// get executing results
c.executing.Range(func(planID int64, task compactor) bool {
results = append(results, &datapb.CompactionPlanResult{
State: commonpb.CompactionState_Executing,
@ -152,6 +155,7 @@ func (c *compactionExecutor) getAllCompactionPlanResult() []*datapb.CompactionPl
return true
})
// get completed results
c.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool {
results = append(results, result)
return true

View File

@ -83,10 +83,10 @@ func TestCompactionExecutor(t *testing.T) {
{expected: false, channel: "ch2", desc: "in dropped"},
}
ex := newCompactionExecutor()
ex.stopExecutingtaskByVChannelName("ch2")
ex.clearTasksByChannel("ch2")
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
assert.Equal(t, test.expected, ex.channelValidateForCompaction(test.channel))
assert.Equal(t, test.expected, ex.isValidChannel(test.channel))
})
}
})
@ -107,7 +107,7 @@ func TestCompactionExecutor(t *testing.T) {
found = ex.executing.Contain(mc.getPlanID())
}
ex.stopExecutingtaskByVChannelName("mock")
ex.clearTasksByChannel("mock")
select {
case <-mc.ctx.Done():
@ -142,8 +142,7 @@ func (mc *mockCompactor) complete() {
mc.done <- struct{}{}
}
func (mc *mockCompactor) injectDone(success bool) {
}
func (mc *mockCompactor) injectDone() {}
func (mc *mockCompactor) compact() (*datapb.CompactionPlanResult, error) {
if !mc.isvalid {

View File

@ -56,7 +56,7 @@ type compactor interface {
complete()
// compact() (*datapb.CompactionResult, error)
compact() (*datapb.CompactionPlanResult, error)
injectDone(success bool)
injectDone()
stop()
getPlanID() UniqueID
getCollection() UniqueID
@ -118,7 +118,7 @@ func (t *compactionTask) complete() {
func (t *compactionTask) stop() {
t.cancel()
<-t.done
t.injectDone(true)
t.injectDone()
}
func (t *compactionTask) getPlanID() UniqueID {
@ -629,7 +629,7 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) {
return planResult, nil
}
func (t *compactionTask) injectDone(success bool) {
func (t *compactionTask) injectDone() {
for _, binlog := range t.plan.SegmentBinlogs {
t.syncMgr.Unblock(binlog.SegmentID)
}

View File

@ -154,7 +154,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
ddn.dropMode.Store(true)
log.Info("Stop compaction of vChannel", zap.String("vChannelName", ddn.vChannelName))
ddn.compactionExecutor.stopExecutingtaskByVChannelName(ddn.vChannelName)
ddn.compactionExecutor.clearTasksByChannel(ddn.vChannelName)
fgMsg.dropCollection = true
pChan := funcutil.ToPhysicalChannel(ddn.vChannelName)

View File

@ -245,7 +245,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "illegel compaction plan")), nil
}
if !node.compactionExecutor.channelValidateForCompaction(req.GetChannel()) {
if !node.compactionExecutor.isValidChannel(req.GetChannel()) {
log.Warn("channel of compaction is marked invalid in compaction executor", zap.String("channelName", req.GetChannel()))
return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "channel is dropping")), nil
}
@ -275,7 +275,7 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac
Status: merr.Status(err),
}, nil
}
results := node.compactionExecutor.getAllCompactionPlanResult()
results := node.compactionExecutor.getAllCompactionResults()
if len(results) > 0 {
planIDs := lo.Map(results, func(result *datapb.CompactionPlanResult, i int) UniqueID {
@ -310,6 +310,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
ds, ok := node.flowgraphManager.getFlowgraphService(req.GetChannelName())
if !ok {
node.compactionExecutor.clearTasksByChannel(req.GetChannelName())
err := merr.WrapErrChannelNotFound(req.GetChannelName())
log.Warn("failed to sync segments", zap.Error(err))
return merr.Status(err), nil
@ -322,7 +323,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
}
bfs := metacache.NewBloomFilterSet(pks...)
ds.metacache.CompactSegments(req.GetCompactedTo(), req.GetPartitionId(), req.GetNumOfRows(), bfs, req.GetCompactedFrom()...)
node.compactionExecutor.injectDone(req.GetPlanID(), true)
node.compactionExecutor.injectDone(req.GetPlanID())
return merr.Success(), nil
}