milvus/internal/datacoord/compaction.go

436 lines
13 KiB
Go
Raw Normal View History

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/zap"
)
// TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple
// TODO we should split compaction into different priorities, small compaction helps to merge segment, large compaction helps to handle delta and expiration of large segments
const (
maxParallelCompactionTaskNum = 100
rpcCompactionTimeout = 10 * time.Second
)
type compactionPlanContext interface {
start()
stop()
// execCompactionPlan start to execute plan and return immediately
execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error
// getCompaction return compaction task. If planId does not exist, return nil.
getCompaction(planID int64) *compactionTask
// updateCompaction set the compaction state to timeout or completed
updateCompaction(ts Timestamp) error
// isFull return true if the task pool is full
isFull() bool
// get compaction tasks by signal id
getCompactionTasksBySignalID(signalID int64) []*compactionTask
}
type compactionTaskState int8
const (
executing compactionTaskState = iota + 1
completed
failed
timeout
)
var (
errChannelNotWatched = errors.New("channel is not watched")
errChannelInBuffer = errors.New("channel is in buffer")
)
type compactionTask struct {
triggerInfo *compactionSignal
plan *datapb.CompactionPlan
state compactionTaskState
dataNodeID int64
result *datapb.CompactionResult
}
func (t *compactionTask) shadowClone(opts ...compactionTaskOpt) *compactionTask {
task := &compactionTask{
triggerInfo: t.triggerInfo,
plan: t.plan,
state: t.state,
dataNodeID: t.dataNodeID,
}
for _, opt := range opts {
opt(task)
}
return task
}
var _ compactionPlanContext = (*compactionPlanHandler)(nil)
type compactionPlanHandler struct {
plans map[int64]*compactionTask // planID -> task
sessions *SessionManager
meta *meta
chManager *ChannelManager
mu sync.RWMutex
executingTaskNum int
allocator allocator
quit chan struct{}
wg sync.WaitGroup
flushCh chan UniqueID
segRefer *SegmentReferenceManager
parallelCh map[int64]chan struct{}
}
func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta *meta,
allocator allocator, flush chan UniqueID, segRefer *SegmentReferenceManager) *compactionPlanHandler {
return &compactionPlanHandler{
plans: make(map[int64]*compactionTask),
chManager: cm,
meta: meta,
sessions: sessions,
allocator: allocator,
flushCh: flush,
segRefer: segRefer,
parallelCh: make(map[int64]chan struct{}),
}
}
func (c *compactionPlanHandler) start() {
interval := time.Duration(Params.DataCoordCfg.CompactionCheckIntervalInSeconds) * time.Second
ticker := time.NewTicker(interval)
c.quit = make(chan struct{})
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
select {
case <-c.quit:
ticker.Stop()
log.Info("compaction handler quit")
return
case <-ticker.C:
cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ts, err := c.allocator.allocTimestamp(cctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
cancel()
continue
}
cancel()
_ = c.updateCompaction(ts)
}
}
}()
}
func (c *compactionPlanHandler) stop() {
close(c.quit)
c.wg.Wait()
}
// execCompactionPlan start to execute plan and return immediately
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
c.mu.Lock()
defer c.mu.Unlock()
nodeID, err := c.chManager.FindWatcher(plan.GetChannel())
if err != nil {
log.Error("failed to find watcher",
zap.Int64("plan ID", plan.GetPlanID()),
zap.Error(err))
return err
}
c.setSegmentsCompacting(plan, true)
task := &compactionTask{
triggerInfo: signal,
plan: plan,
state: executing,
dataNodeID: nodeID,
}
c.plans[plan.PlanID] = task
c.executingTaskNum++
go func() {
log.Info("acquire queue", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID()))
c.acquireQueue(nodeID)
ts, err := c.allocator.allocTimestamp(context.TODO())
if err != nil {
log.Warn("Alloc start time for CompactionPlan failed", zap.Int64("planID", plan.GetPlanID()))
return
}
c.mu.Lock()
c.plans[plan.PlanID] = c.plans[plan.PlanID].shadowClone(func(task *compactionTask) {
task.plan.StartTime = ts
})
c.mu.Unlock()
err = c.sessions.Compaction(nodeID, plan)
if err != nil {
log.Warn("Try to Compaction but DataNode rejected", zap.Any("TargetNodeId", nodeID), zap.Any("planId", plan.GetPlanID()))
c.mu.Lock()
delete(c.plans, plan.PlanID)
c.executingTaskNum--
c.releaseQueue(nodeID)
c.mu.Unlock()
return
}
log.Info("start compaction", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID()))
}()
return nil
}
func (c *compactionPlanHandler) setSegmentsCompacting(plan *datapb.CompactionPlan, compacting bool) {
for _, segmentBinlogs := range plan.GetSegmentBinlogs() {
c.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), compacting)
}
}
// complete a compaction task
// not threadsafe, only can be used internally
func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResult) error {
planID := result.PlanID
if _, ok := c.plans[planID]; !ok {
return fmt.Errorf("plan %d is not found", planID)
}
if c.plans[planID].state != executing {
return fmt.Errorf("plan %d's state is %v", planID, c.plans[planID].state)
}
plan := c.plans[planID].plan
switch plan.GetType() {
case datapb.CompactionType_MergeCompaction, datapb.CompactionType_MixCompaction:
if err := c.handleMergeCompactionResult(plan, result); err != nil {
return err
}
default:
return errors.New("unknown compaction type")
}
c.plans[planID] = c.plans[planID].shadowClone(setState(completed), setResult(result))
c.executingTaskNum--
if c.plans[planID].plan.GetType() == datapb.CompactionType_MergeCompaction ||
c.plans[planID].plan.GetType() == datapb.CompactionType_MixCompaction {
c.flushCh <- result.GetSegmentID()
}
// TODO: when to clean task list
nodeID := c.plans[planID].dataNodeID
c.releaseQueue(nodeID)
return nil
}
func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionResult) error {
oldSegments, modSegments, newSegment, err := c.meta.PrepareCompleteCompactionMutation(plan.GetSegmentBinlogs(), result)
if err != nil {
return err
}
log := log.With(zap.Int64("planID", plan.GetPlanID()))
modInfos := make([]*datapb.SegmentInfo, len(modSegments))
for i := range modSegments {
modInfos[i] = modSegments[i].SegmentInfo
}
log.Info("handleCompactionResult: altering metastore after compaction")
if err := c.meta.alterMetaStoreAfterCompaction(modInfos, newSegment.SegmentInfo); err != nil {
log.Warn("handleCompactionResult: fail to alter metastore after compaction", zap.Error(err))
return fmt.Errorf("fail to alter metastore after compaction, err=%w", err)
}
var nodeID = c.plans[plan.GetPlanID()].dataNodeID
req := &datapb.SyncSegmentsRequest{
PlanID: plan.PlanID,
CompactedTo: newSegment.GetID(),
CompactedFrom: newSegment.GetCompactionFrom(),
NumOfRows: newSegment.GetNumOfRows(),
StatsLogs: newSegment.GetStatslogs(),
}
log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID))
if err := c.sessions.SyncSegments(nodeID, req); err != nil {
log.Warn("handleCompactionResult: fail to sync segments with node, reverting metastore",
zap.Int64("nodeID", nodeID), zap.String("reason", err.Error()))
return c.meta.revertAlterMetaStoreAfterCompaction(oldSegments, newSegment.SegmentInfo)
}
c.meta.alterInMemoryMetaAfterCompaction(newSegment, modSegments)
log.Info("handleCompactionResult: success to handle merge compaction result")
return nil
}
// getCompaction return compaction task. If planId does not exist, return nil.
func (c *compactionPlanHandler) getCompaction(planID int64) *compactionTask {
c.mu.RLock()
defer c.mu.RUnlock()
return c.plans[planID]
}
// expireCompaction set the compaction state to expired
func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
planStates := c.sessions.GetCompactionState()
c.mu.Lock()
defer c.mu.Unlock()
tasks := c.getExecutingCompactions()
for _, task := range tasks {
stateResult, ok := planStates[task.plan.PlanID]
state := stateResult.GetState()
planID := task.plan.PlanID
startTime := task.plan.GetStartTime()
// start time is 0 means this task have not started, skip checker
if startTime == 0 {
continue
}
// check wether the state of CompactionPlan is working
if ok {
if state == commonpb.CompactionState_Completed {
log.Info("compaction completed", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID))
c.completeCompaction(stateResult.GetResult())
continue
}
// check wether the CompactionPlan is timeout
if state == commonpb.CompactionState_Executing && !c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) {
continue
}
log.Info("compaction timeout",
zap.Int64("planID", task.plan.PlanID),
zap.Int64("nodeID", task.dataNodeID),
zap.Uint64("startTime", task.plan.GetStartTime()),
zap.Uint64("now", ts),
)
c.plans[planID] = c.plans[planID].shadowClone(setState(timeout))
continue
}
log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed))
c.setSegmentsCompacting(task.plan, false)
c.executingTaskNum--
c.releaseQueue(task.dataNodeID)
}
return nil
}
func (c *compactionPlanHandler) isTimeout(now Timestamp, start Timestamp, timeout int32) bool {
startTime, _ := tsoutil.ParseTS(start)
ts, _ := tsoutil.ParseTS(now)
return int32(ts.Sub(startTime).Seconds()) >= timeout
}
func (c *compactionPlanHandler) acquireQueue(nodeID int64) {
c.mu.Lock()
_, ok := c.parallelCh[nodeID]
if !ok {
c.parallelCh[nodeID] = make(chan struct{}, calculateParallel())
}
c.mu.Unlock()
c.mu.RLock()
ch := c.parallelCh[nodeID]
c.mu.RUnlock()
ch <- struct{}{}
}
func (c *compactionPlanHandler) releaseQueue(nodeID int64) {
log.Info("try to release queue", zap.Int64("nodeID", nodeID))
ch, ok := c.parallelCh[nodeID]
if !ok {
return
}
<-ch
}
// isFull return true if the task pool is full
func (c *compactionPlanHandler) isFull() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.executingTaskNum >= maxParallelCompactionTaskNum
}
func (c *compactionPlanHandler) getExecutingCompactions() []*compactionTask {
tasks := make([]*compactionTask, 0, len(c.plans))
for _, plan := range c.plans {
if plan.state == executing {
tasks = append(tasks, plan)
}
}
return tasks
}
// get compaction tasks by signal id; if signalID == 0 return all tasks
func (c *compactionPlanHandler) getCompactionTasksBySignalID(signalID int64) []*compactionTask {
c.mu.RLock()
defer c.mu.RUnlock()
var tasks []*compactionTask
for _, t := range c.plans {
if signalID == 0 {
tasks = append(tasks, t)
continue
}
if t.triggerInfo.id != signalID {
continue
}
tasks = append(tasks, t)
}
return tasks
}
type compactionTaskOpt func(task *compactionTask)
func setState(state compactionTaskState) compactionTaskOpt {
return func(task *compactionTask) {
task.state = state
}
}
func setResult(result *datapb.CompactionResult) compactionTaskOpt {
return func(task *compactionTask) {
task.result = result
}
}
// 0.5*min(8, NumCPU/2)
func calculateParallel() int {
return 2
//cores := runtime.NumCPU()
//if cores < 16 {
//return 4
//}
//return cores / 2
}