feat: Add Compaction views and triggers (#27906)

- Add Compaction l0 views
- Add Compaction scheduler
- Add Compaction triggerv2
- Add Compaction view manager

See also: #27606

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/28717/head
XuanYang-cn 2023-11-23 17:30:25 +08:00 committed by GitHub
parent 4fedff6d47
commit 9b371067d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 9552 additions and 459 deletions

View File

@ -427,6 +427,8 @@ generate-mockery-datacoord: getdeps
$(INSTALL_PATH)/mockery --name=allocator --dir=internal/datacoord --filename=mock_allocator_test.go --output=internal/datacoord --structname=NMockAllocator --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=RWChannelStore --dir=internal/datacoord --filename=mock_channel_store.go --output=internal/datacoord --structname=MockRWChannelStore --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=IndexEngineVersionManager --dir=internal/datacoord --filename=mock_index_engine_version_manager.go --output=internal/datacoord --structname=MockVersionManager --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=TriggerManager --dir=internal/datacoord --filename=mock_trigger_manager.go --output=internal/datacoord --structname=MockTriggerManager --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=compactionPlanContext --dir=internal/datacoord --filename=mock_compaction_plan_context.go --output=internal/datacoord --structname=MockCompactionPlanContext --with-expecter --inpackage
generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage

View File

@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -32,7 +31,6 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple
@ -103,158 +101,7 @@ type compactionPlanHandler struct {
allocator allocator
quit chan struct{}
wg sync.WaitGroup
scheduler *scheduler
}
type scheduler struct {
taskNumber *atomic.Int32
queuingTasks []*compactionTask
parallelTasks map[int64][]*compactionTask
mu sync.RWMutex
}
func newScheduler() *scheduler {
return &scheduler{
taskNumber: atomic.NewInt32(0),
queuingTasks: make([]*compactionTask, 0),
parallelTasks: make(map[int64][]*compactionTask),
}
}
// schedule pick 1 or 0 tasks for 1 node
func (s *scheduler) schedule() []*compactionTask {
nodeTasks := make(map[int64][]*compactionTask) // nodeID
s.mu.Lock()
defer s.mu.Unlock()
for _, task := range s.queuingTasks {
if _, ok := nodeTasks[task.dataNodeID]; !ok {
nodeTasks[task.dataNodeID] = make([]*compactionTask, 0)
}
nodeTasks[task.dataNodeID] = append(nodeTasks[task.dataNodeID], task)
}
executable := make(map[int64]*compactionTask)
pickPriorPolicy := func(tasks []*compactionTask, exclusiveChannels []string, executing []string) *compactionTask {
for _, task := range tasks {
if lo.Contains(exclusiveChannels, task.plan.GetChannel()) {
continue
}
if task.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
// Channel of LevelZeroCompaction task with no executing compactions
if !lo.Contains(executing, task.plan.GetChannel()) {
return task
}
// Don't schedule any tasks for channel with LevelZeroCompaction task
// when there're executing compactions
exclusiveChannels = append(exclusiveChannels, task.plan.GetChannel())
continue
}
return task
}
return nil
}
// pick 1 or 0 task for 1 node
for node, tasks := range nodeTasks {
parallel := s.parallelTasks[node]
if len(parallel) >= calculateParallel() {
log.Info("Compaction parallel in DataNode reaches the limit", zap.Int64("nodeID", node), zap.Int("parallel", len(parallel)))
continue
}
var (
executing = typeutil.NewSet[string]()
channelsExecPrior = typeutil.NewSet[string]()
)
for _, t := range parallel {
executing.Insert(t.plan.GetChannel())
if t.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
channelsExecPrior.Insert(t.plan.GetChannel())
}
}
picked := pickPriorPolicy(tasks, channelsExecPrior.Collect(), executing.Collect())
if picked != nil {
executable[node] = picked
}
}
var pickPlans []int64
for node, task := range executable {
pickPlans = append(pickPlans, task.plan.PlanID)
if _, ok := s.parallelTasks[node]; !ok {
s.parallelTasks[node] = []*compactionTask{task}
} else {
s.parallelTasks[node] = append(s.parallelTasks[node], task)
}
}
s.queuingTasks = lo.Filter(s.queuingTasks, func(t *compactionTask, _ int) bool {
return !lo.Contains(pickPlans, t.plan.PlanID)
})
// clean parallelTasks with nodes of no running tasks
for node, tasks := range s.parallelTasks {
if len(tasks) == 0 {
delete(s.parallelTasks, node)
}
}
return lo.Values(executable)
}
func (s *scheduler) finish(nodeID, planID UniqueID) {
s.mu.Lock()
if parallel, ok := s.parallelTasks[nodeID]; ok {
tasks := lo.Filter(parallel, func(t *compactionTask, _ int) bool {
return t.plan.PlanID != planID
})
s.parallelTasks[nodeID] = tasks
s.taskNumber.Dec()
}
s.mu.Unlock()
log.Info("Compaction finished", zap.Int64("planID", planID), zap.Int64("nodeID", nodeID))
s.logStatus()
}
func (s *scheduler) logStatus() {
s.mu.RLock()
defer s.mu.RUnlock()
waiting := lo.Map(s.queuingTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
})
var executing []int64
for _, tasks := range s.parallelTasks {
executing = append(executing, lo.Map(tasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
})...)
}
if len(waiting) > 0 || len(executing) > 0 {
log.Info("Compaction scheduler status", zap.Int64s("waiting", waiting), zap.Int64s("executing", executing))
}
}
func (s *scheduler) submit(tasks ...*compactionTask) {
s.mu.Lock()
s.queuingTasks = append(s.queuingTasks, tasks...)
s.mu.Unlock()
s.taskNumber.Add(int32(len(tasks)))
s.logStatus()
}
func (s *scheduler) getExecutingTaskNum() int {
return int(s.taskNumber.Load())
scheduler *CompactionScheduler
}
func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta *meta, allocator allocator,
@ -265,7 +112,7 @@ func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta
meta: meta,
sessions: sessions,
allocator: allocator,
scheduler: newScheduler(),
scheduler: NewCompactionScheduler(),
}
}
@ -342,15 +189,50 @@ func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *data
c.plans[plan.PlanID] = task
c.mu.Unlock()
c.scheduler.submit(task)
c.scheduler.Submit(task)
log.Info("Compaction plan submited")
return nil
}
func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) {
plan := task.plan
if plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
sealedSegments := c.meta.SelectSegments(func(info *SegmentInfo) bool {
return info.GetCollectionID() == task.triggerInfo.collectionID &&
info.GetPartitionID() == task.triggerInfo.partitionID &&
info.GetInsertChannel() == plan.GetChannel() &&
isFlushState(info.GetState()) &&
!info.isCompacting &&
!info.GetIsImporting() &&
info.GetLevel() != datapb.SegmentLevel_L0 &&
info.GetDmlPosition().GetTimestamp() < task.triggerInfo.pos.GetTimestamp()
})
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
return &datapb.CompactionSegmentBinlogs{
SegmentID: info.GetID(),
Level: datapb.SegmentLevel_L1,
}
})
plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...)
return
}
if plan.GetType() == datapb.CompactionType_MixCompaction {
for _, seg := range plan.GetSegmentBinlogs() {
info := c.meta.GetSegment(seg.GetSegmentID())
seg.Deltalogs = info.GetDeltalogs()
}
return
}
}
func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {
for _, task := range tasks {
// avoid closure capture iteration variable
innerTask := task
c.RefreshPlan(innerTask)
getOrCreateIOPool().Submit(func() (any, error) {
plan := innerTask.plan
log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", innerTask.dataNodeID))

View File

@ -0,0 +1,89 @@
package datacoord
import (
"fmt"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
)
// The LevelZeroSegments keeps the min group
type LevelZeroSegmentsView struct {
label *CompactionGroupLabel
segments []*SegmentView
earliestGrowingSegmentPos *msgpb.MsgPosition
}
var _ CompactionView = (*LevelZeroSegmentsView)(nil)
func (v *LevelZeroSegmentsView) String() string {
l0strings := lo.Map(v.segments, func(v *SegmentView, _ int) string {
return v.LevelZeroString()
})
return fmt.Sprintf("label=<%s>, posT=<%v>, l0 segments=%v",
v.label.String(),
v.earliestGrowingSegmentPos.GetTimestamp(),
l0strings)
}
func (v *LevelZeroSegmentsView) GetGroupLabel() *CompactionGroupLabel {
if v == nil {
return &CompactionGroupLabel{}
}
return v.label
}
func (v *LevelZeroSegmentsView) GetSegmentsView() []*SegmentView {
if v == nil {
return nil
}
return v.segments
}
func (v *LevelZeroSegmentsView) Equal(others []*SegmentView) bool {
if len(v.segments) != len(others) {
return false
}
IDSelector := func(v *SegmentView, _ int) int64 {
return v.ID
}
diffLeft, diffRight := lo.Difference(lo.Map(others, IDSelector), lo.Map(v.segments, IDSelector))
diffCount := len(diffLeft) + len(diffRight)
return diffCount == 0
}
// Trigger triggers all qualified LevelZeroSegments according to views
func (v *LevelZeroSegmentsView) Trigger() CompactionView {
// Only choose segments with position less than the earliest growing segment position
validSegments := lo.Filter(v.segments, func(view *SegmentView, _ int) bool {
return view.dmlPos.GetTimestamp() < v.earliestGrowingSegmentPos.GetTimestamp()
})
var (
minDeltaSize = Params.DataCoordCfg.LevelZeroCompactionTriggerMinSize.GetAsFloat()
minDeltaCount = Params.DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.GetAsInt()
curDeltaSize float64
curDeltaCount int
)
for _, segView := range validSegments {
curDeltaSize += segView.DeltaSize
curDeltaCount += segView.DeltalogCount
}
if curDeltaSize < minDeltaSize && curDeltaCount < minDeltaCount {
return nil
}
return &LevelZeroSegmentsView{
label: v.label,
segments: validSegments,
earliestGrowingSegmentPos: v.earliestGrowingSegmentPos,
}
}

View File

@ -0,0 +1,158 @@
package datacoord
import (
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
)
func TestLevelZeroSegmentsViewSuite(t *testing.T) {
suite.Run(t, new(LevelZeroSegmentsViewSuite))
}
type LevelZeroSegmentsViewSuite struct {
suite.Suite
v *LevelZeroSegmentsView
}
func genTestL0SegmentView(ID UniqueID, label *CompactionGroupLabel, posTime Timestamp) *SegmentView {
return &SegmentView{
ID: ID,
label: label,
dmlPos: &msgpb.MsgPosition{Timestamp: posTime},
Level: datapb.SegmentLevel_L0,
State: commonpb.SegmentState_Flushed,
}
}
func (s *LevelZeroSegmentsViewSuite) SetupTest() {
label := &CompactionGroupLabel{
CollectionID: 1,
PartitionID: 10,
Channel: "ch-1",
}
segments := []*SegmentView{
genTestL0SegmentView(100, label, 10000),
genTestL0SegmentView(101, label, 10000),
genTestL0SegmentView(102, label, 10000),
}
targetView := &LevelZeroSegmentsView{
label, segments, &msgpb.MsgPosition{Timestamp: 10000},
}
s.True(label.Equal(targetView.GetGroupLabel()))
log.Info("LevelZeroSegmentsView", zap.String("view", targetView.String()))
s.v = targetView
}
func (s *LevelZeroSegmentsViewSuite) TestEqual() {
label := s.v.GetGroupLabel()
tests := []struct {
description string
input []*SegmentView
output bool
}{
{"Different segment numbers", []*SegmentView{genTestL0SegmentView(100, label, 10000)}, false},
{"Same number, diff segmentIDs", []*SegmentView{
genTestL0SegmentView(100, label, 10000),
genTestL0SegmentView(101, label, 10000),
genTestL0SegmentView(200, label, 10000),
}, false},
{"Same", s.v.GetSegmentsView(), true},
}
for _, test := range tests {
s.Run(test.description, func() {
got := s.v.Equal(test.input)
s.Equal(test.output, got)
})
}
}
func (s *LevelZeroSegmentsViewSuite) TestTrigger() {
label := s.v.GetGroupLabel()
views := []*SegmentView{
genTestL0SegmentView(100, label, 20000),
genTestL0SegmentView(101, label, 10000),
genTestL0SegmentView(102, label, 30000),
genTestL0SegmentView(103, label, 40000),
}
s.v.segments = views
tests := []struct {
description string
prepSizeEach float64
prepCountEach int
prepEarliestT Timestamp
expectedSegs []UniqueID
}{
{
"No valid segments by earliest growing segment pos",
64,
20,
10000,
nil,
},
{
"Not qualified",
1,
1,
30000,
nil,
},
{
"Trigger by > TriggerDeltaSize",
8,
1,
30000,
[]UniqueID{100, 101},
},
{
"Trigger by > TriggerDeltaCount",
1,
10,
30000,
[]UniqueID{100, 101},
},
}
for _, test := range tests {
s.Run(test.description, func() {
s.v.earliestGrowingSegmentPos.Timestamp = test.prepEarliestT
for _, view := range s.v.GetSegmentsView() {
if view.dmlPos.Timestamp < test.prepEarliestT {
view.DeltalogCount = test.prepCountEach
view.DeltaSize = test.prepSizeEach
}
}
log.Info("LevelZeroSegmentsView", zap.String("view", s.v.String()))
gotView := s.v.Trigger()
if len(test.expectedSegs) == 0 {
s.Nil(gotView)
} else {
levelZeroView, ok := gotView.(*LevelZeroSegmentsView)
s.True(ok)
s.NotNil(levelZeroView)
gotSegIDs := lo.Map(levelZeroView.GetSegmentsView(), func(v *SegmentView, _ int) int64 {
return v.ID
})
s.ElementsMatch(gotSegIDs, test.expectedSegs)
}
})
}
}

View File

@ -0,0 +1,177 @@
package datacoord
import (
"sync"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type Scheduler interface {
Submit(t ...*compactionTask)
// Start()
// Stop()
// IsFull() bool
// GetCompactionTasksBySignalID(signalID int64) []compactionTask
}
type CompactionScheduler struct {
taskNumber *atomic.Int32
queuingTasks []*compactionTask
parallelTasks map[int64][]*compactionTask
mu sync.RWMutex
planHandler *compactionPlanHandler
}
var _ Scheduler = (*CompactionScheduler)(nil)
func NewCompactionScheduler() *CompactionScheduler {
return &CompactionScheduler{
taskNumber: atomic.NewInt32(0),
queuingTasks: make([]*compactionTask, 0),
parallelTasks: make(map[int64][]*compactionTask),
}
}
func (s *CompactionScheduler) Submit(tasks ...*compactionTask) {
s.mu.Lock()
s.queuingTasks = append(s.queuingTasks, tasks...)
s.mu.Unlock()
s.taskNumber.Add(int32(len(tasks)))
s.logStatus()
}
// schedule pick 1 or 0 tasks for 1 node
func (s *CompactionScheduler) schedule() []*compactionTask {
nodeTasks := make(map[int64][]*compactionTask) // nodeID
s.mu.Lock()
defer s.mu.Unlock()
for _, task := range s.queuingTasks {
if _, ok := nodeTasks[task.dataNodeID]; !ok {
nodeTasks[task.dataNodeID] = make([]*compactionTask, 0)
}
nodeTasks[task.dataNodeID] = append(nodeTasks[task.dataNodeID], task)
}
executable := make(map[int64]*compactionTask)
pickPriorPolicy := func(tasks []*compactionTask, exclusiveChannels []string, executing []string) *compactionTask {
for _, task := range tasks {
if lo.Contains(exclusiveChannels, task.plan.GetChannel()) {
continue
}
if task.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
// Channel of LevelZeroCompaction task with no executing compactions
if !lo.Contains(executing, task.plan.GetChannel()) {
return task
}
// Don't schedule any tasks for channel with LevelZeroCompaction task
// when there're executing compactions
exclusiveChannels = append(exclusiveChannels, task.plan.GetChannel())
continue
}
return task
}
return nil
}
// pick 1 or 0 task for 1 node
for node, tasks := range nodeTasks {
parallel := s.parallelTasks[node]
if len(parallel) >= calculateParallel() {
log.Info("Compaction parallel in DataNode reaches the limit", zap.Int64("nodeID", node), zap.Int("parallel", len(parallel)))
continue
}
var (
executing = typeutil.NewSet[string]()
channelsExecPrior = typeutil.NewSet[string]()
)
for _, t := range parallel {
executing.Insert(t.plan.GetChannel())
if t.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
channelsExecPrior.Insert(t.plan.GetChannel())
}
}
picked := pickPriorPolicy(tasks, channelsExecPrior.Collect(), executing.Collect())
if picked != nil {
executable[node] = picked
}
}
var pickPlans []int64
for node, task := range executable {
pickPlans = append(pickPlans, task.plan.PlanID)
if _, ok := s.parallelTasks[node]; !ok {
s.parallelTasks[node] = []*compactionTask{task}
} else {
s.parallelTasks[node] = append(s.parallelTasks[node], task)
}
}
s.queuingTasks = lo.Filter(s.queuingTasks, func(t *compactionTask, _ int) bool {
return !lo.Contains(pickPlans, t.plan.PlanID)
})
// clean parallelTasks with nodes of no running tasks
for node, tasks := range s.parallelTasks {
if len(tasks) == 0 {
delete(s.parallelTasks, node)
}
}
return lo.Values(executable)
}
func (s *CompactionScheduler) finish(nodeID, planID UniqueID) {
s.mu.Lock()
if parallel, ok := s.parallelTasks[nodeID]; ok {
tasks := lo.Filter(parallel, func(t *compactionTask, _ int) bool {
return t.plan.PlanID != planID
})
s.parallelTasks[nodeID] = tasks
s.taskNumber.Dec()
}
s.mu.Unlock()
log.Info("Compaction finished", zap.Int64("planID", planID), zap.Int64("nodeID", nodeID))
s.logStatus()
}
func (s *CompactionScheduler) logStatus() {
s.mu.RLock()
defer s.mu.RUnlock()
waiting := lo.Map(s.queuingTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
})
var executing []int64
for _, tasks := range s.parallelTasks {
executing = append(executing, lo.Map(tasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
})...)
}
if len(waiting) > 0 || len(executing) > 0 {
log.Info("Compaction scheduler status", zap.Int64s("waiting", waiting), zap.Int64s("executing", executing))
}
}
func (s *CompactionScheduler) getExecutingTaskNum() int {
return int(s.taskNumber.Load())
}

View File

@ -0,0 +1,176 @@
package datacoord
import (
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
func TestSchedulerSuite(t *testing.T) {
suite.Run(t, new(SchedulerSuite))
}
type SchedulerSuite struct {
suite.Suite
scheduler *CompactionScheduler
}
func (s *SchedulerSuite) SetupTest() {
s.scheduler = NewCompactionScheduler()
s.scheduler.parallelTasks = map[int64][]*compactionTask{
100: {
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch-1", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 2, Channel: "ch-1", Type: datapb.CompactionType_MinorCompaction}},
},
101: {
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 3, Channel: "ch-2", Type: datapb.CompactionType_MinorCompaction}},
},
102: {
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 4, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}},
},
}
s.scheduler.taskNumber.Add(4)
}
func (s *SchedulerSuite) TestScheduleEmpty() {
emptySch := NewCompactionScheduler()
tasks := emptySch.schedule()
s.Empty(tasks)
s.Equal(0, emptySch.getExecutingTaskNum())
s.Empty(emptySch.queuingTasks)
s.Empty(emptySch.parallelTasks)
}
func (s *SchedulerSuite) TestScheduleParallelTaskFull() {
// dataNode 100's paralleTasks is full
tests := []struct {
description string
tasks []*compactionTask
expectedOut []UniqueID // planID
}{
{"with L0 tasks", []*compactionTask{
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}
for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.getExecutingTaskNum())
// submit the testing tasks
s.scheduler.Submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
gotTasks := s.scheduler.schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
}))
})
}
}
func (s *SchedulerSuite) TestScheduleNodeWith1ParallelTask() {
// dataNode 101's paralleTasks has 1 task running, not L0 task
tests := []struct {
description string
tasks []*compactionTask
expectedOut []UniqueID // planID
}{
{"with L0 tasks diff channel", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{10}},
{"with L0 tasks same channel", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-2", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{11}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{14}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}
for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.getExecutingTaskNum())
// submit the testing tasks
s.scheduler.Submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
gotTasks := s.scheduler.schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
}))
// the second schedule returns empty for full paralleTasks
gotTasks = s.scheduler.schedule()
s.Empty(gotTasks)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
})
}
}
func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() {
// dataNode 102's paralleTasks has running L0 tasks
// nothing of the same channel will be able to schedule
tests := []struct {
description string
tasks []*compactionTask
expectedOut []UniqueID // planID
}{
{"with L0 tasks diff channel", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{10}},
{"with L0 tasks same channel", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-3", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{11}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-3", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{13}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}
for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.getExecutingTaskNum())
// submit the testing tasks
s.scheduler.Submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
gotTasks := s.scheduler.schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
}))
// the second schedule returns empty for full paralleTasks
if len(gotTasks) > 0 {
gotTasks = s.scheduler.schedule()
s.Empty(gotTasks)
}
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
})
}
}

View File

@ -23,11 +23,9 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -41,172 +39,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestSchedulerSuite(t *testing.T) {
suite.Run(t, new(SchedulerSuite))
}
type SchedulerSuite struct {
suite.Suite
scheduler *scheduler
}
func (s *SchedulerSuite) SetupTest() {
s.scheduler = newScheduler()
s.scheduler.parallelTasks = map[int64][]*compactionTask{
100: {
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch-1", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 2, Channel: "ch-1", Type: datapb.CompactionType_MinorCompaction}},
},
101: {
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 3, Channel: "ch-2", Type: datapb.CompactionType_MinorCompaction}},
},
102: {
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 4, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}},
},
}
s.scheduler.taskNumber.Add(4)
}
func (s *SchedulerSuite) TestScheduleEmpty() {
emptySch := newScheduler()
tasks := emptySch.schedule()
s.Empty(tasks)
s.Equal(0, emptySch.getExecutingTaskNum())
s.Empty(emptySch.queuingTasks)
s.Empty(emptySch.parallelTasks)
}
func (s *SchedulerSuite) TestScheduleParallelTaskFull() {
// dataNode 100's paralleTasks is full
tests := []struct {
description string
tasks []*compactionTask
expectedOut []UniqueID // planID
}{
{"with L0 tasks", []*compactionTask{
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}
for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.getExecutingTaskNum())
// submit the testing tasks
s.scheduler.submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
gotTasks := s.scheduler.schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
}))
})
}
}
func (s *SchedulerSuite) TestScheduleNodeWith1ParallelTask() {
// dataNode 101's paralleTasks has 1 task running, not L0 task
tests := []struct {
description string
tasks []*compactionTask
expectedOut []UniqueID // planID
}{
{"with L0 tasks diff channel", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{10}},
{"with L0 tasks same channel", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-2", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{11}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{14}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}
for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.getExecutingTaskNum())
// submit the testing tasks
s.scheduler.submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
gotTasks := s.scheduler.schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
}))
// the second schedule returns empty for full paralleTasks
gotTasks = s.scheduler.schedule()
s.Empty(gotTasks)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
})
}
}
func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() {
// dataNode 102's paralleTasks has running L0 tasks
// nothing of the same channel will be able to schedule
tests := []struct {
description string
tasks []*compactionTask
expectedOut []UniqueID // planID
}{
{"with L0 tasks diff channel", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{10}},
{"with L0 tasks same channel", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-3", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{11}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-3", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{13}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}
for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.getExecutingTaskNum())
// submit the testing tasks
s.scheduler.submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
gotTasks := s.scheduler.schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
}))
// the second schedule returns empty for full paralleTasks
if len(gotTasks) > 0 {
gotTasks = s.scheduler.schedule()
s.Empty(gotTasks)
}
s.Equal(4+len(test.tasks), s.scheduler.getExecutingTaskNum())
})
}
}
func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
type fields struct {
plans map[int64]*compactionTask
@ -284,7 +116,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
sessions: tt.fields.sessions,
chManager: tt.fields.chManager,
allocator: tt.fields.allocatorFactory(),
scheduler: newScheduler(),
scheduler: NewCompactionScheduler(),
}
Params.Save(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key, "1")
c.start()
@ -334,7 +166,7 @@ func Test_compactionPlanHandler_execWithParallels(t *testing.T) {
},
},
allocator: newMockAllocator(),
scheduler: newScheduler(),
scheduler: NewCompactionScheduler(),
}
signal := &compactionSignal{id: 100}
@ -651,7 +483,7 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
plans: plans,
sessions: sessions,
meta: meta,
scheduler: newScheduler(),
scheduler: NewCompactionScheduler(),
}
err := c.completeCompaction(&compactionResult)
@ -751,7 +583,7 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
plans: plans,
sessions: sessions,
meta: meta,
scheduler: newScheduler(),
scheduler: NewCompactionScheduler(),
}
err := c.completeCompaction(&compactionResult)
@ -933,7 +765,7 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) {
plans: tt.fields.plans,
sessions: tt.fields.sessions,
meta: tt.fields.meta,
scheduler: newScheduler(),
scheduler: NewCompactionScheduler(),
}
err := c.updateCompaction(tt.args.ts)
@ -987,7 +819,7 @@ func Test_newCompactionPlanHandler(t *testing.T) {
chManager: &ChannelManager{},
meta: &meta{},
allocator: newMockAllocator(),
scheduler: newScheduler(),
scheduler: NewCompactionScheduler(),
},
},
}

View File

@ -27,6 +27,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
@ -56,8 +57,9 @@ type compactionSignal struct {
isGlobal bool
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
channel string
segmentID UniqueID
pos *msgpb.MsgPosition
}
var _ trigger = (*compactionTrigger)(nil)
@ -408,7 +410,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
break
}
start := time.Now()
if err := t.fillOriginPlan(plan); err != nil {
if err := fillOriginPlan(t.allocator, plan); err != nil {
log.Warn("failed to fill plan",
zap.Int64("collectionID", signal.collectionID),
zap.Int64s("segmentIDs", segIDs),
@ -512,7 +514,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
break
}
start := time.Now()
if err := t.fillOriginPlan(plan); err != nil {
if err := fillOriginPlan(t.allocator, plan); err != nil {
log.Warn("failed to fill plan", zap.Error(err))
continue
}
@ -807,34 +809,18 @@ func isExpandableSmallSegment(segment *SegmentInfo) bool {
return segment.GetNumOfRows() < int64(float64(segment.GetMaxRowNum())*(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()-1))
}
func (t *compactionTrigger) fillOriginPlan(plan *datapb.CompactionPlan) error {
// TODO context
id, err := t.allocator.allocID(context.TODO())
if err != nil {
return err
}
plan.PlanID = id
plan.TimeoutInSeconds = int32(Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt())
return nil
}
func (t *compactionTrigger) isStaleSegment(segment *SegmentInfo) bool {
return time.Since(segment.lastFlushTime).Minutes() >= segmentTimedFlushDuration
}
func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDiskIndex bool, compactTime *compactTime) bool {
// no longer restricted binlog numbers because this is now related to field numbers
var binLog int
for _, binlogs := range segment.GetBinlogs() {
binLog += len(binlogs.GetBinlogs())
}
binlogCount := GetBinlogCount(segment.GetBinlogs())
// count all the statlog file count, only for flush generated segments
if len(segment.CompactionFrom) == 0 {
var statsLog int
for _, statsLogs := range segment.GetStatslogs() {
statsLog += len(statsLogs.GetBinlogs())
}
statsLogCount := GetBinlogCount(segment.GetStatslogs())
var maxSize int
if isDiskIndex {
@ -846,19 +832,15 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis
// if stats log is more than expected, trigger compaction to reduce stats log size.
// TODO maybe we want to compact to single statslog to reduce watch dml channel cost
// TODO avoid rebuild index twice.
if statsLog > maxSize*2.0 {
log.Info("stats number is too much, trigger compaction", zap.Int64("segmentID", segment.ID), zap.Int("Bin logs", binLog), zap.Int("Stat logs", statsLog))
if statsLogCount > maxSize*2.0 {
log.Info("stats number is too much, trigger compaction", zap.Int64("segmentID", segment.ID), zap.Int("Bin logs", binlogCount), zap.Int("Stat logs", statsLogCount))
return true
}
}
var deltaLog int
for _, deltaLogs := range segment.GetDeltalogs() {
deltaLog += len(deltaLogs.GetBinlogs())
}
if deltaLog > Params.DataCoordCfg.SingleCompactionDeltalogMaxNum.GetAsInt() {
log.Info("total delta number is too much, trigger compaction", zap.Int64("segmentID", segment.ID), zap.Int("Bin logs", binLog), zap.Int("Delta logs", deltaLog))
deltaLogCount := GetBinlogCount(segment.GetDeltalogs())
if deltaLogCount > Params.DataCoordCfg.SingleCompactionDeltalogMaxNum.GetAsInt() {
log.Info("total delta number is too much, trigger compaction", zap.Int64("segmentID", segment.ID), zap.Int("Bin logs", binlogCount), zap.Int("Delta logs", deltaLogCount))
return true
}

View File

@ -1926,7 +1926,7 @@ func Test_compactionTrigger_new(t *testing.T) {
}
func Test_compactionTrigger_handleSignal(t *testing.T) {
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), newMockHandler(), newMockVersionManager())
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: NewCompactionScheduler()}, newMockAllocator(), newMockHandler(), newMockVersionManager())
signal := &compactionSignal{
segmentID: 1,
}
@ -1936,12 +1936,12 @@ func Test_compactionTrigger_handleSignal(t *testing.T) {
}
func Test_compactionTrigger_allocTs(t *testing.T) {
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), newMockHandler(), newMockVersionManager())
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: NewCompactionScheduler()}, newMockAllocator(), newMockHandler(), newMockVersionManager())
ts, err := got.allocTs()
assert.NoError(t, err)
assert.True(t, ts > 0)
got = newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, &FailsAllocator{}, newMockHandler(), newMockVersionManager())
got = newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: NewCompactionScheduler()}, &FailsAllocator{}, newMockHandler(), newMockVersionManager())
ts, err = got.allocTs()
assert.Error(t, err)
assert.Equal(t, uint64(0), ts)
@ -1968,7 +1968,7 @@ func Test_compactionTrigger_getCompactTime(t *testing.T) {
}
m := &meta{segments: NewSegmentsInfo(), collections: collections}
got := newCompactionTrigger(m, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(),
got := newCompactionTrigger(m, &compactionPlanHandler{scheduler: NewCompactionScheduler()}, newMockAllocator(),
&ServerHandler{
&Server{
meta: m,

View File

@ -0,0 +1,128 @@
package datacoord
import (
"context"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
)
type CompactionTriggerType int8
const (
TriggerTypeLevelZeroView CompactionTriggerType = iota + 1
TriggerTypeSegmentSizeView
)
type TriggerManager interface {
Notify(UniqueID, CompactionTriggerType, []CompactionView)
}
// CompactionTriggerManager registers Triggers to TriggerType
// so that when the certain TriggerType happens, the corresponding triggers can
// trigger the correct compaction plans.
// Trigger types:
// 1. Change of Views
// - LevelZeroViewTrigger
// - SegmentSizeViewTrigger
//
// 2. SystemIDLE & schedulerIDLE
// 3. Manual Compaction
type CompactionTriggerManager struct {
meta *meta
scheduler Scheduler
handler compactionPlanContext // TODO replace with scheduler
allocator allocator
}
func NewCompactionTriggerManager(meta *meta, alloc allocator, handler compactionPlanContext) *CompactionTriggerManager {
m := &CompactionTriggerManager{
meta: meta,
allocator: alloc,
handler: handler,
}
return m
}
func (m *CompactionTriggerManager) Notify(taskID UniqueID, eventType CompactionTriggerType, views []CompactionView) {
for _, view := range views {
switch eventType {
case TriggerTypeLevelZeroView:
outView := view.Trigger()
if outView == nil {
continue
}
plan := m.BuildLevelZeroCompactionPlan(outView)
if plan == nil {
continue
}
log.Info("Trigger a LevelZeroCompaction plan", zap.String("output view", outView.String()))
label := outView.GetGroupLabel()
signal := &compactionSignal{
id: taskID,
isForce: false,
isGlobal: true,
collectionID: label.CollectionID,
partitionID: label.PartitionID,
pos: outView.(*LevelZeroSegmentsView).earliestGrowingSegmentPos,
}
// TODO, remove handler, use scheduler
// m.scheduler.Submit(plan)
m.handler.execCompactionPlan(signal, plan)
}
}
}
func (m *CompactionTriggerManager) BuildLevelZeroCompactionPlan(view CompactionView) *datapb.CompactionPlan {
var segmentBinlogs []*datapb.CompactionSegmentBinlogs
levelZeroSegs := lo.Map(view.GetSegmentsView(), func(v *SegmentView, _ int) *datapb.CompactionSegmentBinlogs {
s := m.meta.GetSegment(v.ID)
return &datapb.CompactionSegmentBinlogs{
SegmentID: s.GetID(),
Deltalogs: s.GetDeltalogs(),
Level: datapb.SegmentLevel_L0,
}
})
segmentBinlogs = append(segmentBinlogs, levelZeroSegs...)
plan := &datapb.CompactionPlan{
Type: datapb.CompactionType_Level0DeleteCompaction,
SegmentBinlogs: segmentBinlogs,
Channel: view.GetGroupLabel().Channel,
}
if err := fillOriginPlan(m.allocator, plan); err != nil {
return nil
}
return plan
}
// chanPartSegments is an internal result struct, which is aggregates of SegmentInfos with same collectionID, partitionID and channelName
type chanPartSegments struct {
collectionID UniqueID
partitionID UniqueID
channelName string
segments []*SegmentInfo
}
func fillOriginPlan(alloc allocator, plan *datapb.CompactionPlan) error {
// TODO context
id, err := alloc.allocID(context.TODO())
if err != nil {
return err
}
plan.PlanID = id
plan.TimeoutInSeconds = Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32()
return nil
}

View File

@ -0,0 +1,88 @@
package datacoord
import (
"testing"
"github.com/pingcap/log"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
func TestCompactionTriggerManagerSuite(t *testing.T) {
suite.Run(t, new(CompactionTriggerManagerSuite))
}
type CompactionTriggerManagerSuite struct {
suite.Suite
mockAlloc *NMockAllocator
mockPlanContext *MockCompactionPlanContext
testLabel *CompactionGroupLabel
m *CompactionTriggerManager
}
func (s *CompactionTriggerManagerSuite) SetupTest() {
s.mockAlloc = NewNMockAllocator(s.T())
s.mockPlanContext = NewMockCompactionPlanContext(s.T())
s.testLabel = &CompactionGroupLabel{
CollectionID: 1,
PartitionID: 10,
Channel: "ch-1",
}
meta := &meta{segments: &SegmentsInfo{
segments: genSegmentsForMeta(s.testLabel),
}}
s.m = NewCompactionTriggerManager(meta, s.mockAlloc, s.mockPlanContext)
}
func (s *CompactionTriggerManagerSuite) TestNotify() {
viewManager := NewCompactionViewManager(s.m.meta, s.m, s.m.allocator)
collSegs := s.m.meta.GetCompactableSegmentGroupByCollection()
segments, found := collSegs[1]
s.Require().True(found)
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
return info.GetLevel() == datapb.SegmentLevel_L0
})
segmentViews, levelZeroView := viewManager.GetLatestLevelZeroSegmentWithSignals(1, levelZeroSegments)
s.Require().NotEmpty(segmentViews)
s.Require().Equal(1, len(levelZeroView))
cView, ok := levelZeroView[0].(*LevelZeroSegmentsView)
s.True(ok)
s.NotNil(cView)
log.Info("view", zap.Any("cView", cView))
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil)
s.mockPlanContext.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).
Run(func(signal *compactionSignal, plan *datapb.CompactionPlan) {
s.EqualValues(19530, signal.id)
s.True(signal.isGlobal)
s.False(signal.isForce)
s.EqualValues(30000, signal.pos.GetTimestamp())
s.Equal(s.testLabel.CollectionID, signal.collectionID)
s.Equal(s.testLabel.PartitionID, signal.partitionID)
s.NotNil(plan)
s.Equal(s.testLabel.Channel, plan.GetChannel())
s.Equal(datapb.CompactionType_Level0DeleteCompaction, plan.GetType())
expectedSegs := []int64{100, 101, 102}
gotSegs := lo.Map(plan.GetSegmentBinlogs(), func(b *datapb.CompactionSegmentBinlogs, _ int) int64 {
return b.GetSegmentID()
})
s.ElementsMatch(expectedSegs, gotSegs)
log.Info("generated plan", zap.Any("plan", plan))
}).Return(nil).Once()
s.m.Notify(19530, TriggerTypeLevelZeroView, levelZeroView)
}

View File

@ -0,0 +1,176 @@
package datacoord
import (
"fmt"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
)
type CompactionView interface {
GetGroupLabel() *CompactionGroupLabel
GetSegmentsView() []*SegmentView
String() string
Trigger() CompactionView
}
type FullViews struct {
collections map[int64][]*SegmentView // collectionID
}
type SegmentViewSelector func(view *SegmentView) bool
func (v *FullViews) GetSegmentViewBy(collectionID UniqueID, selectors SegmentViewSelector) []*SegmentView {
views, ok := v.collections[collectionID]
if !ok {
return nil
}
var ret []*SegmentView
for _, view := range views {
if selectors(view) {
ret = append(ret, view)
}
}
return ret
}
type CompactionGroupLabel struct {
CollectionID UniqueID
PartitionID UniqueID
Channel string
}
func (label *CompactionGroupLabel) IsMinGroup() bool {
return len(label.Channel) != 0 && label.PartitionID != 0 && label.CollectionID != 0
}
func (label *CompactionGroupLabel) Equal(other *CompactionGroupLabel) bool {
return other != nil &&
other.CollectionID == label.CollectionID &&
other.PartitionID == label.PartitionID &&
other.Channel == label.Channel
}
func (label *CompactionGroupLabel) String() string {
return fmt.Sprintf("coll=%d, part=%d, channel=%s", label.CollectionID, label.PartitionID, label.Channel)
}
type SegmentView struct {
ID UniqueID
label *CompactionGroupLabel
State commonpb.SegmentState
Level datapb.SegmentLevel
// positions
startPos *msgpb.MsgPosition
dmlPos *msgpb.MsgPosition
// size
Size float64
ExpireSize float64
DeltaSize float64
// file numbers
BinlogCount int
StatslogCount int
DeltalogCount int
}
func GetSegmentViews(segments ...*SegmentInfo) []*SegmentView {
return lo.Map(segments, func(segment *SegmentInfo, _ int) *SegmentView {
return &SegmentView{
ID: segment.ID,
label: &CompactionGroupLabel{
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
Channel: segment.GetInsertChannel(),
},
State: segment.GetState(),
Level: segment.GetLevel(),
// positions
startPos: segment.GetStartPosition(),
dmlPos: segment.GetDmlPosition(),
DeltaSize: GetBinlogSizeAsBytes(segment.GetDeltalogs()),
DeltalogCount: GetBinlogCount(segment.GetDeltalogs()),
Size: GetBinlogSizeAsBytes(segment.GetBinlogs()),
BinlogCount: GetBinlogCount(segment.GetBinlogs()),
StatslogCount: GetBinlogCount(segment.GetStatslogs()),
// TODO: set the following
// ExpireSize float64
}
})
}
func (v *SegmentView) Equal(other *SegmentView) bool {
return v.Size == other.Size &&
v.ExpireSize == other.ExpireSize &&
v.DeltaSize == other.DeltaSize &&
v.BinlogCount == other.BinlogCount &&
v.StatslogCount == other.StatslogCount &&
v.DeltalogCount == other.DeltalogCount
}
func (v *SegmentView) String() string {
return fmt.Sprintf("ID=%d, label=<%s>, state=%s, level=%s, binlogSize=%.2f, binlogCount=%d, deltaSize=%.2f, deltaCount=%d, expireSize=%.2f",
v.ID, v.label, v.State.String(), v.Level.String(), v.Size, v.BinlogCount, v.DeltaSize, v.DeltalogCount, v.ExpireSize)
}
func (v *SegmentView) LevelZeroString() string {
return fmt.Sprintf("<ID=%d, level=%s, deltaSize=%.2f, deltaCount=%d>",
v.ID, v.Level.String(), v.DeltaSize, v.DeltalogCount)
}
func GetBinlogCount(fieldBinlogs []*datapb.FieldBinlog) int {
var num int
for _, binlog := range fieldBinlogs {
num += len(binlog.GetBinlogs())
}
return num
}
func GetExpiredSizeAsBytes(expireTime Timestamp, fieldBinlogs []*datapb.FieldBinlog) float64 {
var expireSize float64
for _, binlogs := range fieldBinlogs {
for _, l := range binlogs.GetBinlogs() {
// TODO, we should probably estimate expired log entries by total rows
// in binlog and the ralationship of timeTo, timeFrom and expire time
if l.TimestampTo < expireTime {
log.Info("mark binlog as expired",
zap.Int64("binlogID", l.GetLogID()),
zap.Uint64("binlogTimestampTo", l.TimestampTo),
zap.Uint64("compactExpireTime", expireTime))
expireSize += float64(l.GetLogSize())
}
}
}
return expireSize
}
func GetBinlogSizeAsBytes(deltaBinlogs []*datapb.FieldBinlog) float64 {
var deltaSize float64
for _, deltaLogs := range deltaBinlogs {
for _, l := range deltaLogs.GetBinlogs() {
deltaSize += float64(l.GetLogSize())
}
}
return deltaSize
}
func buildGroupKey(partitionID UniqueID, channel string) string {
return fmt.Sprintf("%d-%s", partitionID, channel)
}

View File

@ -0,0 +1,178 @@
package datacoord
import (
"context"
"sync"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/logutil"
)
type CompactionViewManager struct {
view *FullViews
viewGuard sync.RWMutex
meta *meta
eventManager TriggerManager
allocator allocator
closeSig chan struct{}
closeWg sync.WaitGroup
}
func NewCompactionViewManager(meta *meta, trigger TriggerManager, allocator allocator) *CompactionViewManager {
return &CompactionViewManager{
view: &FullViews{
collections: make(map[int64][]*SegmentView),
},
meta: meta,
eventManager: trigger,
allocator: allocator,
closeSig: make(chan struct{}),
}
}
func (m *CompactionViewManager) Start() {
m.closeWg.Add(1)
go m.checkLoop()
}
func (m *CompactionViewManager) Close() {
close(m.closeSig)
m.closeWg.Wait()
}
func (m *CompactionViewManager) checkLoop() {
defer logutil.LogPanic()
defer m.closeWg.Done()
if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
return
}
interval := Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-m.closeSig:
log.Info("Compaction View checkLoop quit")
return
case <-ticker.C:
m.Check()
}
}
}
// Global check could take some time, we need to record the time.
func (m *CompactionViewManager) Check() {
// Only process L0 compaction now, so just return if its not enabled
if !Params.DataCoordCfg.EnableLevelZeroSegment.GetAsBool() {
return
}
ctx := context.TODO()
taskID, err := m.allocator.allocID(ctx)
if err != nil {
log.Warn("CompactionViewManager check loop failed, unable to allocate taskID",
zap.Error(err))
return
}
m.viewGuard.Lock()
defer m.viewGuard.Unlock()
events := make(map[CompactionTriggerType][]CompactionView)
latestCollSegs := m.meta.GetCompactableSegmentGroupByCollection()
latestCollIDs := lo.Keys(latestCollSegs)
viewCollIDs := lo.Keys(m.view.collections)
diffAdd, diffRemove := lo.Difference(latestCollIDs, viewCollIDs)
for _, collID := range diffRemove {
delete(m.view.collections, collID)
}
for collID, segments := range latestCollSegs {
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
return info.GetLevel() == datapb.SegmentLevel_L0
})
// For new collection, TODO: update all segments
// - for now, just update Level Zero Segments
if lo.Contains(diffAdd, collID) {
m.view.collections[collID] = GetSegmentViews(levelZeroSegments...)
continue
}
latestLevelZeroViews, signals := m.GetLatestLevelZeroSegmentWithSignals(collID, levelZeroSegments)
if len(latestLevelZeroViews) != 0 {
m.view.collections[collID] = latestLevelZeroViews
}
events[TriggerTypeLevelZeroView] = signals
}
for eType, views := range events {
m.eventManager.Notify(taskID, eType, views)
}
}
func (m *CompactionViewManager) GetLatestLevelZeroSegmentWithSignals(collID UniqueID, LevelZeroSegments []*SegmentInfo) ([]*SegmentView, []CompactionView) {
partChanView := m.BuildLevelZeroSegmentsView(collID, LevelZeroSegments)
var signals []CompactionView
var needUpdate bool = false
for _, latestView := range partChanView {
views := m.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
return v.label.PartitionID == latestView.label.PartitionID &&
v.label.Channel == latestView.label.Channel
})
if !latestView.Equal(views) {
needUpdate = true
signals = append(signals, latestView)
}
}
if needUpdate {
var allViews []*SegmentView
for _, latestView := range partChanView {
allViews = append(allViews, latestView.segments...)
}
return allViews, signals
}
return nil, signals
}
func (m *CompactionViewManager) BuildLevelZeroSegmentsView(collectionID UniqueID, levelZeroSegments []*SegmentInfo) []*LevelZeroSegmentsView {
partChanView := make(map[string]*LevelZeroSegmentsView) // "part-chan" to earliestStartPosition
for _, seg := range levelZeroSegments {
key := buildGroupKey(seg.PartitionID, seg.InsertChannel)
if _, ok := partChanView[key]; !ok {
label := &CompactionGroupLabel{
CollectionID: collectionID,
PartitionID: seg.PartitionID,
Channel: seg.InsertChannel,
}
partChanView[key] = &LevelZeroSegmentsView{
label: label,
segments: []*SegmentView{},
earliestGrowingSegmentPos: m.meta.GetEarliestStartPositionOfGrowingSegments(label),
}
}
partChanView[key].segments = append(partChanView[key].segments, GetSegmentViews(seg)[0])
}
return lo.Values(partChanView)
}

View File

@ -0,0 +1,180 @@
package datacoord
import (
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func TestCompactionViewManagerSuite(t *testing.T) {
suite.Run(t, new(CompactionViewManagerSuite))
}
type CompactionViewManagerSuite struct {
suite.Suite
mockAlloc *NMockAllocator
mockTriggerManager *MockTriggerManager
testLabel *CompactionGroupLabel
m *CompactionViewManager
}
const MB = 1024 * 1024 * 1024
func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo {
segArgs := []struct {
ID UniqueID
Level datapb.SegmentLevel
State commonpb.SegmentState
PosT Timestamp
LogSize int64
LogCount int
}{
{100, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 4 * MB, 1},
{101, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 4 * MB, 1},
{102, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 4 * MB, 1},
{103, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 50000, 4 * MB, 1},
{200, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 50000, 0, 0},
{201, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 30000, 0, 0},
{300, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed, 10000, 0, 0},
{301, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed, 20000, 0, 0},
}
segments := make(map[int64]*SegmentInfo)
for _, arg := range segArgs {
info := genTestSegmentInfo(label, arg.ID, arg.Level, arg.State)
if info.Level == datapb.SegmentLevel_L0 || info.State == commonpb.SegmentState_Flushed {
info.Deltalogs = genTestDeltalogs(arg.LogCount, arg.LogSize)
info.DmlPosition = &msgpb.MsgPosition{Timestamp: arg.PosT}
}
if info.State == commonpb.SegmentState_Growing {
info.StartPosition = &msgpb.MsgPosition{Timestamp: arg.PosT}
}
segments[arg.ID] = info
}
return segments
}
func (s *CompactionViewManagerSuite) SetupTest() {
s.mockAlloc = NewNMockAllocator(s.T())
s.mockTriggerManager = NewMockTriggerManager(s.T())
s.testLabel = &CompactionGroupLabel{
CollectionID: 1,
PartitionID: 10,
Channel: "ch-1",
}
meta := &meta{segments: &SegmentsInfo{
segments: genSegmentsForMeta(s.testLabel),
}}
s.m = NewCompactionViewManager(meta, s.mockTriggerManager, s.mockAlloc)
}
func (s *CompactionViewManagerSuite) TestCheckLoop() {
s.Run("Test start and close", func() {
s.m.Start()
s.m.Close()
})
s.Run("Test not enable auto compaction", func() {
paramtable.Get().Save(Params.DataCoordCfg.EnableAutoCompaction.Key, "false")
defer paramtable.Get().Reset(Params.DataCoordCfg.EnableAutoCompaction.Key)
s.m.Start()
s.m.closeWg.Wait()
})
}
func (s *CompactionViewManagerSuite) TestCheck() {
paramtable.Get().Save(Params.DataCoordCfg.EnableLevelZeroSegment.Key, "true")
defer paramtable.Get().Reset(Params.DataCoordCfg.EnableLevelZeroSegment.Key)
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Times(3)
// nothing int the view, just store in the first check
s.Empty(s.m.view.collections)
s.m.Check()
for _, views := range s.m.view.collections {
for _, view := range views {
s.Equal(datapb.SegmentLevel_L0, view.Level)
s.Equal(commonpb.SegmentState_Flushed, view.State)
log.Info("String", zap.String("segment", view.String()))
log.Info("LevelZeroString", zap.String("segment", view.LevelZeroString()))
}
}
// change of meta
addInfo := genTestSegmentInfo(s.testLabel, 19530, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed)
addInfo.Deltalogs = genTestDeltalogs(1, 10)
s.m.meta.Lock()
s.m.meta.segments.segments[addInfo.GetID()] = addInfo
s.m.meta.Unlock()
s.mockTriggerManager.EXPECT().Notify(mock.Anything, mock.Anything, mock.Anything).
Run(func(taskID UniqueID, tType CompactionTriggerType, views []CompactionView) {
s.EqualValues(1, taskID)
s.Equal(TriggerTypeLevelZeroView, tType)
s.Equal(1, len(views))
v, ok := views[0].(*LevelZeroSegmentsView)
s.True(ok)
s.NotNil(v)
expectedSegs := []int64{100, 101, 102, 103, 19530}
gotSegs := lo.Map(v.segments, func(s *SegmentView, _ int) int64 { return s.ID })
s.ElementsMatch(expectedSegs, gotSegs)
s.EqualValues(30000, v.earliestGrowingSegmentPos.GetTimestamp())
log.Info("All views", zap.String("l0 view", v.String()))
}).Once()
s.m.Check()
// clear meta
s.m.meta.Lock()
s.m.meta.segments.segments = make(map[int64]*SegmentInfo)
s.m.meta.Unlock()
s.m.Check()
s.Empty(s.m.view.collections)
}
func genTestSegmentInfo(label *CompactionGroupLabel, ID UniqueID, level datapb.SegmentLevel, state commonpb.SegmentState) *SegmentInfo {
return &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: ID,
CollectionID: label.CollectionID,
PartitionID: label.PartitionID,
InsertChannel: label.Channel,
Level: level,
State: state,
},
}
}
func genTestDeltalogs(logCount int, logSize int64) []*datapb.FieldBinlog {
var binlogs []*datapb.Binlog
for i := 0; i < logCount; i++ {
binlog := &datapb.Binlog{
LogSize: logSize,
}
binlogs = append(binlogs, binlog)
}
return []*datapb.FieldBinlog{
{Binlogs: binlogs},
}
}

View File

@ -216,14 +216,6 @@ func (m *meta) GetClonedCollectionInfo(collectionID UniqueID) *collectionInfo {
return cloneColl
}
// chanPartSegments is an internal result struct, which is aggregates of SegmentInfos with same collectionID, partitionID and channelName
type chanPartSegments struct {
collectionID UniqueID
partitionID UniqueID
channelName string
segments []*SegmentInfo
}
// GetSegmentsChanPart returns segments organized in Channel-Partition dimension with selector applied
func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegments {
m.RLock()
@ -796,96 +788,69 @@ func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*Segm
}
// GetSegmentsByChannel returns all segment info which insert channel equals provided `dmlCh`
func (m *meta) GetSegmentsByChannel(dmlCh string) []*SegmentInfo {
m.RLock()
defer m.RUnlock()
infos := make([]*SegmentInfo, 0)
segments := m.segments.GetSegments()
for _, segment := range segments {
if !isSegmentHealthy(segment) || segment.InsertChannel != dmlCh {
continue
}
infos = append(infos, segment)
}
return infos
func (m *meta) GetSegmentsByChannel(channel string) []*SegmentInfo {
return m.SelectSegments(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) && segment.InsertChannel == channel
})
}
// GetSegmentsOfCollection get all segments of collection
func (m *meta) GetSegmentsOfCollection(collectionID UniqueID) []*SegmentInfo {
m.RLock()
defer m.RUnlock()
ret := make([]*SegmentInfo, 0)
segments := m.segments.GetSegments()
for _, segment := range segments {
if isSegmentHealthy(segment) && segment.GetCollectionID() == collectionID {
ret = append(ret, segment)
}
}
return ret
return m.SelectSegments(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) && segment.GetCollectionID() == collectionID
})
}
// GetSegmentsIDOfCollection returns all segment ids which collection equals to provided `collectionID`
func (m *meta) GetSegmentsIDOfCollection(collectionID UniqueID) []UniqueID {
m.RLock()
defer m.RUnlock()
ret := make([]UniqueID, 0)
segments := m.segments.GetSegments()
for _, segment := range segments {
if isSegmentHealthy(segment) && segment.CollectionID == collectionID {
ret = append(ret, segment.ID)
}
}
return ret
segments := m.SelectSegments(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) && segment.CollectionID == collectionID
})
return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 {
return segment.ID
})
}
// GetSegmentsIDOfCollection returns all segment ids which collection equals to provided `collectionID`
func (m *meta) GetSegmentsIDOfCollectionWithDropped(collectionID UniqueID) []UniqueID {
m.RLock()
defer m.RUnlock()
ret := make([]UniqueID, 0)
segments := m.segments.GetSegments()
for _, segment := range segments {
if segment != nil &&
segments := m.SelectSegments(func(segment *SegmentInfo) bool {
return segment != nil &&
segment.GetState() != commonpb.SegmentState_SegmentStateNone &&
segment.GetState() != commonpb.SegmentState_NotExist &&
segment.CollectionID == collectionID {
ret = append(ret, segment.ID)
}
}
return ret
segment.CollectionID == collectionID
})
return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 {
return segment.ID
})
}
// GetSegmentsIDOfPartition returns all segments ids which collection & partition equals to provided `collectionID`, `partitionID`
func (m *meta) GetSegmentsIDOfPartition(collectionID, partitionID UniqueID) []UniqueID {
m.RLock()
defer m.RUnlock()
ret := make([]UniqueID, 0)
segments := m.segments.GetSegments()
for _, segment := range segments {
if isSegmentHealthy(segment) && segment.CollectionID == collectionID && segment.PartitionID == partitionID {
ret = append(ret, segment.ID)
}
}
return ret
segments := m.SelectSegments(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
segment.CollectionID == collectionID &&
segment.PartitionID == partitionID
})
return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 {
return segment.ID
})
}
// GetSegmentsIDOfPartition returns all segments ids which collection & partition equals to provided `collectionID`, `partitionID`
func (m *meta) GetSegmentsIDOfPartitionWithDropped(collectionID, partitionID UniqueID) []UniqueID {
m.RLock()
defer m.RUnlock()
ret := make([]UniqueID, 0)
segments := m.segments.GetSegments()
for _, segment := range segments {
if segment != nil &&
segment.GetState() != commonpb.SegmentState_SegmentStateNone &&
segments := m.SelectSegments(func(segment *SegmentInfo) bool {
return segment.GetState() != commonpb.SegmentState_SegmentStateNone &&
segment.GetState() != commonpb.SegmentState_NotExist &&
segment.CollectionID == collectionID &&
segment.PartitionID == partitionID {
ret = append(ret, segment.ID)
}
}
return ret
segment.PartitionID == partitionID
})
return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 {
return segment.ID
})
}
// GetNumRowsOfPartition returns row count of segments belongs to provided collection & partition
@ -904,30 +869,16 @@ func (m *meta) GetNumRowsOfPartition(collectionID UniqueID, partitionID UniqueID
// GetUnFlushedSegments get all segments which state is not `Flushing` nor `Flushed`
func (m *meta) GetUnFlushedSegments() []*SegmentInfo {
m.RLock()
defer m.RUnlock()
ret := make([]*SegmentInfo, 0)
segments := m.segments.GetSegments()
for _, segment := range segments {
if segment.State == commonpb.SegmentState_Growing || segment.State == commonpb.SegmentState_Sealed {
ret = append(ret, segment)
}
}
return ret
return m.SelectSegments(func(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Growing || segment.GetState() == commonpb.SegmentState_Sealed
})
}
// GetFlushingSegments get all segments which state is `Flushing`
func (m *meta) GetFlushingSegments() []*SegmentInfo {
m.RLock()
defer m.RUnlock()
ret := make([]*SegmentInfo, 0)
segments := m.segments.GetSegments()
for _, info := range segments {
if info.State == commonpb.SegmentState_Flushing {
ret = append(ret, info)
}
}
return ret
return m.SelectSegments(func(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Flushing
})
}
// SelectSegments select segments with selector
@ -1350,6 +1301,43 @@ func (m *meta) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID
return m.catalog.GcConfirm(ctx, collectionID, partitionID)
}
func (m *meta) GetCompactableSegmentGroupByCollection() map[int64][]*SegmentInfo {
allSegs := m.SelectSegments(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) && // sealed segment
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() // not importing now
})
ret := make(map[int64][]*SegmentInfo)
for _, seg := range allSegs {
if _, ok := ret[seg.CollectionID]; !ok {
ret[seg.CollectionID] = make([]*SegmentInfo, 0)
}
ret[seg.CollectionID] = append(ret[seg.CollectionID], seg)
}
return ret
}
func (m *meta) GetEarliestStartPositionOfGrowingSegments(label *CompactionGroupLabel) *msgpb.MsgPosition {
segments := m.SelectSegments(func(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Growing &&
segment.GetCollectionID() == label.CollectionID &&
segment.GetPartitionID() == label.PartitionID &&
segment.GetInsertChannel() == label.Channel
})
var earliest *msgpb.MsgPosition
for _, seg := range segments {
if earliest == nil || earliest.GetTimestamp() > seg.GetStartPosition().GetTimestamp() {
earliest = seg.GetStartPosition()
}
}
return earliest
}
// addNewSeg update metrics update for a new segment.
func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, rowCount int64) {
s.stateChange[state.String()]++

View File

@ -0,0 +1,67 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package datacoord
import mock "github.com/stretchr/testify/mock"
// MockTriggerManager is an autogenerated mock type for the TriggerManager type
type MockTriggerManager struct {
mock.Mock
}
type MockTriggerManager_Expecter struct {
mock *mock.Mock
}
func (_m *MockTriggerManager) EXPECT() *MockTriggerManager_Expecter {
return &MockTriggerManager_Expecter{mock: &_m.Mock}
}
// Notify provides a mock function with given fields: _a0, _a1, _a2
func (_m *MockTriggerManager) Notify(_a0 int64, _a1 CompactionTriggerType, _a2 []CompactionView) {
_m.Called(_a0, _a1, _a2)
}
// MockTriggerManager_Notify_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Notify'
type MockTriggerManager_Notify_Call struct {
*mock.Call
}
// Notify is a helper method to define mock.On call
// - _a0 int64
// - _a1 CompactionTriggerType
// - _a2 []CompactionView
func (_e *MockTriggerManager_Expecter) Notify(_a0 interface{}, _a1 interface{}, _a2 interface{}) *MockTriggerManager_Notify_Call {
return &MockTriggerManager_Notify_Call{Call: _e.mock.On("Notify", _a0, _a1, _a2)}
}
func (_c *MockTriggerManager_Notify_Call) Run(run func(_a0 int64, _a1 CompactionTriggerType, _a2 []CompactionView)) *MockTriggerManager_Notify_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(CompactionTriggerType), args[2].([]CompactionView))
})
return _c
}
func (_c *MockTriggerManager_Notify_Call) Return() *MockTriggerManager_Notify_Call {
_c.Call.Return()
return _c
}
func (_c *MockTriggerManager_Notify_Call) RunAndReturn(run func(int64, CompactionTriggerType, []CompactionView)) *MockTriggerManager_Notify_Call {
_c.Call.Return(run)
return _c
}
// NewMockTriggerManager creates a new instance of MockTriggerManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockTriggerManager(t interface {
mock.TestingT
Cleanup(func())
}) *MockTriggerManager {
mock := &MockTriggerManager{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -497,6 +497,7 @@ message CompactionSegmentBinlogs {
repeated FieldBinlog field2StatslogPaths = 3;
repeated FieldBinlog deltalogs = 4;
string insert_channel = 5;
SegmentLevel level = 6;
}
message CompactionPlan {

File diff suppressed because it is too large Load Diff