mirror of https://github.com/milvus-io/milvus.git
This reverts commit 79490d7be2
.
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
hotfix-2.4.21
parent
6eadcacd94
commit
08307a868b
|
@ -104,4 +104,3 @@ internal/proto/**/*.pb.go
|
|||
internal/core/src/pb/*.pb.h
|
||||
internal/core/src/pb/*.pb.cc
|
||||
**/legacypb/*.pb.go
|
||||
pkg/streaming/proto/**/*.pb.go
|
||||
|
|
|
@ -1,29 +1,27 @@
|
|||
package datacoord
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
type l0CompactionPolicy struct {
|
||||
meta *meta
|
||||
view *FullViews
|
||||
|
||||
activeCollections *activeCollections
|
||||
emptyLoopCount *atomic.Int64
|
||||
}
|
||||
|
||||
func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy {
|
||||
return &l0CompactionPolicy{
|
||||
meta: meta,
|
||||
activeCollections: newActiveCollections(),
|
||||
meta: meta,
|
||||
// donot share views with other compaction policy
|
||||
view: &FullViews{collections: make(map[int64][]*SegmentView)},
|
||||
emptyLoopCount: atomic.NewInt64(0),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -31,53 +29,93 @@ func (policy *l0CompactionPolicy) Enable() bool {
|
|||
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool()
|
||||
}
|
||||
|
||||
// Notify policy to record the active updated(when adding a new L0 segment) collections.
|
||||
func (policy *l0CompactionPolicy) OnCollectionUpdate(collectionID int64) {
|
||||
policy.activeCollections.Record(collectionID)
|
||||
func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) {
|
||||
// support config hot refresh
|
||||
events := policy.generateEventForLevelZeroViewChange()
|
||||
if len(events) != 0 {
|
||||
// each time when triggers a compaction, the idleTicker would reset
|
||||
policy.emptyLoopCount.Store(0)
|
||||
return events, nil
|
||||
}
|
||||
policy.emptyLoopCount.Inc()
|
||||
|
||||
if policy.emptyLoopCount.Load() >= 3 {
|
||||
policy.emptyLoopCount.Store(0)
|
||||
return policy.generateEventForLevelZeroViewIDLE(), nil
|
||||
}
|
||||
|
||||
return make(map[CompactionTriggerType][]CompactionView), nil
|
||||
}
|
||||
|
||||
func (policy *l0CompactionPolicy) Trigger() (events map[CompactionTriggerType][]CompactionView, err error) {
|
||||
events = make(map[CompactionTriggerType][]CompactionView)
|
||||
func (policy *l0CompactionPolicy) generateEventForLevelZeroViewChange() (events map[CompactionTriggerType][]CompactionView) {
|
||||
latestCollSegs := policy.meta.GetCompactableSegmentGroupByCollection()
|
||||
latestCollIDs := lo.Keys(latestCollSegs)
|
||||
viewCollIDs := lo.Keys(policy.view.collections)
|
||||
|
||||
// 1. Get active collections
|
||||
activeColls := policy.activeCollections.GetActiveCollections()
|
||||
_, diffRemove := lo.Difference(latestCollIDs, viewCollIDs)
|
||||
for _, collID := range diffRemove {
|
||||
delete(policy.view.collections, collID)
|
||||
}
|
||||
|
||||
// 2. Idle collections = all collections - active collections
|
||||
missCached, idleColls := lo.Difference(activeColls, lo.Keys(latestCollSegs))
|
||||
policy.activeCollections.ClearMissCached(missCached...)
|
||||
refreshedL0Views := policy.RefreshLevelZeroViews(latestCollSegs)
|
||||
if len(refreshedL0Views) > 0 {
|
||||
events = make(map[CompactionTriggerType][]CompactionView)
|
||||
events[TriggerTypeLevelZeroViewChange] = refreshedL0Views
|
||||
}
|
||||
|
||||
idleCollsSet := typeutil.NewUniqueSet(idleColls...)
|
||||
activeL0Views, idleL0Views := []CompactionView{}, []CompactionView{}
|
||||
return events
|
||||
}
|
||||
|
||||
func (policy *l0CompactionPolicy) RefreshLevelZeroViews(latestCollSegs map[int64][]*SegmentInfo) []CompactionView {
|
||||
var allRefreshedL0Veiws []CompactionView
|
||||
for collID, segments := range latestCollSegs {
|
||||
policy.activeCollections.Read(collID)
|
||||
|
||||
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
|
||||
return info.GetLevel() == datapb.SegmentLevel_L0
|
||||
})
|
||||
if len(levelZeroSegments) == 0 {
|
||||
continue
|
||||
latestL0Segments := GetViewsByInfo(levelZeroSegments...)
|
||||
needRefresh, collRefreshedViews := policy.getChangedLevelZeroViews(collID, latestL0Segments)
|
||||
if needRefresh {
|
||||
log.Info("Refresh compaction level zero views",
|
||||
zap.Int64("collectionID", collID),
|
||||
zap.Strings("views", lo.Map(collRefreshedViews, func(view CompactionView, _ int) string {
|
||||
return view.String()
|
||||
})))
|
||||
policy.view.collections[collID] = latestL0Segments
|
||||
}
|
||||
|
||||
labelViews := policy.groupL0ViewsByPartChan(collID, GetViewsByInfo(levelZeroSegments...))
|
||||
if idleCollsSet.Contain(collID) {
|
||||
idleL0Views = append(idleL0Views, labelViews...)
|
||||
} else {
|
||||
activeL0Views = append(activeL0Views, labelViews...)
|
||||
if len(collRefreshedViews) > 0 {
|
||||
allRefreshedL0Veiws = append(allRefreshedL0Veiws, collRefreshedViews...)
|
||||
}
|
||||
|
||||
}
|
||||
if len(activeL0Views) > 0 {
|
||||
events[TriggerTypeLevelZeroViewChange] = activeL0Views
|
||||
}
|
||||
|
||||
if len(idleL0Views) > 0 {
|
||||
events[TriggerTypeLevelZeroViewIDLE] = idleL0Views
|
||||
return allRefreshedL0Veiws
|
||||
}
|
||||
|
||||
func (policy *l0CompactionPolicy) getChangedLevelZeroViews(collID UniqueID, LevelZeroViews []*SegmentView) (needRefresh bool, refreshed []CompactionView) {
|
||||
cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
|
||||
return v.Level == datapb.SegmentLevel_L0
|
||||
})
|
||||
|
||||
if len(LevelZeroViews) == 0 && len(cachedViews) != 0 {
|
||||
needRefresh = true
|
||||
return
|
||||
}
|
||||
|
||||
latestViews := policy.groupL0ViewsByPartChan(collID, LevelZeroViews)
|
||||
for _, latestView := range latestViews {
|
||||
views := lo.Filter(cachedViews, func(v *SegmentView, _ int) bool {
|
||||
return v.label.Equal(latestView.GetGroupLabel())
|
||||
})
|
||||
|
||||
if !latestView.Equal(views) {
|
||||
refreshed = append(refreshed, latestView)
|
||||
needRefresh = true
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) []CompactionView {
|
||||
func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) map[string]*LevelZeroSegmentsView {
|
||||
partChanView := make(map[string]*LevelZeroSegmentsView) // "part-chan" as key
|
||||
for _, view := range levelZeroSegments {
|
||||
key := view.label.Key()
|
||||
|
@ -92,71 +130,26 @@ func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID,
|
|||
}
|
||||
}
|
||||
|
||||
return lo.Map(lo.Values(partChanView), func(view *LevelZeroSegmentsView, _ int) CompactionView {
|
||||
return view
|
||||
})
|
||||
return partChanView
|
||||
}
|
||||
|
||||
type activeCollection struct {
|
||||
ID int64
|
||||
lastRefresh time.Time
|
||||
readCount *atomic.Int64
|
||||
}
|
||||
|
||||
func newActiveCollection(ID int64) *activeCollection {
|
||||
return &activeCollection{
|
||||
ID: ID,
|
||||
lastRefresh: time.Now(),
|
||||
readCount: atomic.NewInt64(0),
|
||||
}
|
||||
}
|
||||
|
||||
type activeCollections struct {
|
||||
collections map[int64]*activeCollection
|
||||
collGuard sync.RWMutex
|
||||
}
|
||||
|
||||
func newActiveCollections() *activeCollections {
|
||||
return &activeCollections{
|
||||
collections: make(map[int64]*activeCollection),
|
||||
}
|
||||
}
|
||||
|
||||
func (ac *activeCollections) ClearMissCached(collectionIDs ...int64) {
|
||||
ac.collGuard.Lock()
|
||||
defer ac.collGuard.Unlock()
|
||||
lo.ForEach(collectionIDs, func(collID int64, _ int) {
|
||||
delete(ac.collections, collID)
|
||||
})
|
||||
}
|
||||
|
||||
func (ac *activeCollections) Record(collectionID int64) {
|
||||
ac.collGuard.Lock()
|
||||
defer ac.collGuard.Unlock()
|
||||
if _, ok := ac.collections[collectionID]; !ok {
|
||||
ac.collections[collectionID] = newActiveCollection(collectionID)
|
||||
} else {
|
||||
ac.collections[collectionID].lastRefresh = time.Now()
|
||||
ac.collections[collectionID].readCount.Store(0)
|
||||
}
|
||||
}
|
||||
|
||||
func (ac *activeCollections) Read(collectionID int64) {
|
||||
ac.collGuard.Lock()
|
||||
defer ac.collGuard.Unlock()
|
||||
if _, ok := ac.collections[collectionID]; ok {
|
||||
ac.collections[collectionID].readCount.Inc()
|
||||
if ac.collections[collectionID].readCount.Load() >= 3 &&
|
||||
time.Since(ac.collections[collectionID].lastRefresh) > 3*paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second) {
|
||||
log.Info("Active(of deletions) collections become idle", zap.Int64("collectionID", collectionID))
|
||||
delete(ac.collections, collectionID)
|
||||
func (policy *l0CompactionPolicy) generateEventForLevelZeroViewIDLE() map[CompactionTriggerType][]CompactionView {
|
||||
events := make(map[CompactionTriggerType][]CompactionView, 0)
|
||||
for collID := range policy.view.collections {
|
||||
cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
|
||||
return v.Level == datapb.SegmentLevel_L0
|
||||
})
|
||||
if len(cachedViews) > 0 {
|
||||
log.Info("Views idle for a long time, try to trigger a TriggerTypeLevelZeroViewIDLE compaction event")
|
||||
grouped := policy.groupL0ViewsByPartChan(collID, cachedViews)
|
||||
events[TriggerTypeLevelZeroViewIDLE] = lo.Map(lo.Values(grouped),
|
||||
func(l0View *LevelZeroSegmentsView, _ int) CompactionView {
|
||||
return l0View
|
||||
})
|
||||
log.Info("Generate TriggerTypeLevelZeroViewIDLE compaction event", zap.Int64("collectionID", collID))
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ac *activeCollections) GetActiveCollections() []int64 {
|
||||
ac.collGuard.RLock()
|
||||
defer ac.collGuard.RUnlock()
|
||||
|
||||
return lo.Keys(ac.collections)
|
||||
return events
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@ package datacoord
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/zap"
|
||||
|
@ -26,7 +25,6 @@ import (
|
|||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
func TestL0CompactionPolicySuite(t *testing.T) {
|
||||
|
@ -45,63 +43,14 @@ type L0CompactionPolicySuite struct {
|
|||
l0_policy *l0CompactionPolicy
|
||||
}
|
||||
|
||||
func (s *L0CompactionPolicySuite) SetupTest() {
|
||||
s.testLabel = &CompactionGroupLabel{
|
||||
CollectionID: 1,
|
||||
PartitionID: 10,
|
||||
Channel: "ch-1",
|
||||
}
|
||||
|
||||
segments := genSegmentsForMeta(s.testLabel)
|
||||
meta := &meta{segments: NewSegmentsInfo()}
|
||||
for id, segment := range segments {
|
||||
meta.segments.SetSegment(id, segment)
|
||||
}
|
||||
|
||||
s.l0_policy = newL0CompactionPolicy(meta)
|
||||
}
|
||||
|
||||
const MB = 1024 * 1024
|
||||
|
||||
func (s *L0CompactionPolicySuite) TestActiveToIdle() {
|
||||
paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key)
|
||||
|
||||
s.l0_policy.OnCollectionUpdate(1)
|
||||
s.Require().EqualValues(1, s.l0_policy.activeCollections.GetActiveCollections()[0])
|
||||
|
||||
<-time.After(3 * time.Second)
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
gotViews, err := s.l0_policy.Trigger()
|
||||
s.NoError(err)
|
||||
s.NotNil(gotViews)
|
||||
s.NotEmpty(gotViews)
|
||||
_, ok := gotViews[TriggerTypeLevelZeroViewChange]
|
||||
s.True(ok)
|
||||
}
|
||||
|
||||
s.Empty(s.l0_policy.activeCollections.GetActiveCollections())
|
||||
gotViews, err := s.l0_policy.Trigger()
|
||||
s.NoError(err)
|
||||
s.NotNil(gotViews)
|
||||
s.NotEmpty(gotViews)
|
||||
_, ok := gotViews[TriggerTypeLevelZeroViewIDLE]
|
||||
s.True(ok)
|
||||
}
|
||||
|
||||
func (s *L0CompactionPolicySuite) TestTriggerIdle() {
|
||||
s.Require().Empty(s.l0_policy.activeCollections.GetActiveCollections())
|
||||
func (s *L0CompactionPolicySuite) TestTrigger() {
|
||||
s.Require().Empty(s.l0_policy.view.collections)
|
||||
|
||||
events, err := s.l0_policy.Trigger()
|
||||
s.NoError(err)
|
||||
s.NotEmpty(events)
|
||||
|
||||
gotViews, ok := events[TriggerTypeLevelZeroViewChange]
|
||||
s.False(ok)
|
||||
s.Empty(gotViews)
|
||||
|
||||
gotViews, ok = events[TriggerTypeLevelZeroViewIDLE]
|
||||
s.True(ok)
|
||||
s.NotNil(gotViews)
|
||||
s.Equal(1, len(gotViews))
|
||||
|
@ -113,9 +62,31 @@ func (s *L0CompactionPolicySuite) TestTriggerIdle() {
|
|||
s.Equal(datapb.SegmentLevel_L0, view.Level)
|
||||
}
|
||||
log.Info("cView", zap.String("string", cView.String()))
|
||||
}
|
||||
|
||||
func (s *L0CompactionPolicySuite) TestTriggerViewChange() {
|
||||
// Test for idle trigger
|
||||
for i := 0; i < 2; i++ {
|
||||
events, err = s.l0_policy.Trigger()
|
||||
s.NoError(err)
|
||||
s.Equal(0, len(events))
|
||||
}
|
||||
s.EqualValues(2, s.l0_policy.emptyLoopCount.Load())
|
||||
|
||||
events, err = s.l0_policy.Trigger()
|
||||
s.NoError(err)
|
||||
s.EqualValues(0, s.l0_policy.emptyLoopCount.Load())
|
||||
s.Equal(1, len(events))
|
||||
gotViews, ok = events[TriggerTypeLevelZeroViewIDLE]
|
||||
s.True(ok)
|
||||
s.NotNil(gotViews)
|
||||
s.Equal(1, len(gotViews))
|
||||
cView = gotViews[0]
|
||||
s.Equal(s.testLabel, cView.GetGroupLabel())
|
||||
s.Equal(4, len(cView.GetSegmentsView()))
|
||||
for _, view := range cView.GetSegmentsView() {
|
||||
s.Equal(datapb.SegmentLevel_L0, view.Level)
|
||||
}
|
||||
log.Info("cView", zap.String("string", cView.String()))
|
||||
|
||||
segArgs := []struct {
|
||||
ID UniqueID
|
||||
PosT Timestamp
|
||||
|
@ -142,17 +113,34 @@ func (s *L0CompactionPolicySuite) TestTriggerViewChange() {
|
|||
}
|
||||
s.l0_policy.meta = meta
|
||||
|
||||
s.l0_policy.OnCollectionUpdate(s.testLabel.CollectionID)
|
||||
events, err := s.l0_policy.Trigger()
|
||||
events, err = s.l0_policy.Trigger()
|
||||
s.NoError(err)
|
||||
s.Equal(1, len(events))
|
||||
gotViews, ok := events[TriggerTypeLevelZeroViewChange]
|
||||
gotViews, ok = events[TriggerTypeLevelZeroViewChange]
|
||||
s.True(ok)
|
||||
s.Equal(1, len(gotViews))
|
||||
}
|
||||
|
||||
gotViews, ok = events[TriggerTypeLevelZeroViewIDLE]
|
||||
s.False(ok)
|
||||
s.Empty(gotViews)
|
||||
func (s *L0CompactionPolicySuite) TestGenerateEventForLevelZeroViewChange() {
|
||||
s.Require().Empty(s.l0_policy.view.collections)
|
||||
|
||||
events := s.l0_policy.generateEventForLevelZeroViewChange()
|
||||
s.NotEmpty(events)
|
||||
s.NotEmpty(s.l0_policy.view.collections)
|
||||
|
||||
gotViews, ok := events[TriggerTypeLevelZeroViewChange]
|
||||
s.True(ok)
|
||||
s.NotNil(gotViews)
|
||||
s.Equal(1, len(gotViews))
|
||||
|
||||
storedViews, ok := s.l0_policy.view.collections[s.testLabel.CollectionID]
|
||||
s.True(ok)
|
||||
s.NotNil(storedViews)
|
||||
s.Equal(4, len(storedViews))
|
||||
|
||||
for _, view := range storedViews {
|
||||
s.Equal(s.testLabel, view.label)
|
||||
s.Equal(datapb.SegmentLevel_L0, view.Level)
|
||||
}
|
||||
}
|
||||
|
||||
func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo {
|
||||
|
@ -196,6 +184,22 @@ func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo {
|
|||
return segments
|
||||
}
|
||||
|
||||
func (s *L0CompactionPolicySuite) SetupTest() {
|
||||
s.testLabel = &CompactionGroupLabel{
|
||||
CollectionID: 1,
|
||||
PartitionID: 10,
|
||||
Channel: "ch-1",
|
||||
}
|
||||
|
||||
segments := genSegmentsForMeta(s.testLabel)
|
||||
meta := &meta{segments: NewSegmentsInfo()}
|
||||
for id, segment := range segments {
|
||||
meta.segments.SetSegment(id, segment)
|
||||
}
|
||||
|
||||
s.l0_policy = newL0CompactionPolicy(meta)
|
||||
}
|
||||
|
||||
func genTestSegmentInfo(label *CompactionGroupLabel, ID UniqueID, level datapb.SegmentLevel, state commonpb.SegmentState) *SegmentInfo {
|
||||
return &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/logutil"
|
||||
)
|
||||
|
||||
|
@ -39,27 +40,9 @@ const (
|
|||
TriggerTypeSingle
|
||||
)
|
||||
|
||||
func (t CompactionTriggerType) String() string {
|
||||
switch t {
|
||||
case TriggerTypeLevelZeroViewChange:
|
||||
return "LevelZeroViewChange"
|
||||
case TriggerTypeLevelZeroViewIDLE:
|
||||
return "LevelZeroViewIDLE"
|
||||
case TriggerTypeSegmentSizeViewChange:
|
||||
return "SegmentSizeViewChange"
|
||||
case TriggerTypeClustering:
|
||||
return "Clustering"
|
||||
case TriggerTypeSingle:
|
||||
return "Single"
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
type TriggerManager interface {
|
||||
Start()
|
||||
Stop()
|
||||
OnCollectionUpdate(collectionID int64)
|
||||
ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error)
|
||||
}
|
||||
|
||||
|
@ -78,6 +61,10 @@ type CompactionTriggerManager struct {
|
|||
handler Handler
|
||||
allocator allocator
|
||||
|
||||
view *FullViews
|
||||
// todo handle this lock
|
||||
viewGuard lock.RWMutex
|
||||
|
||||
meta *meta
|
||||
l0Policy *l0CompactionPolicy
|
||||
clusteringPolicy *clusteringCompactionPolicy
|
||||
|
@ -92,8 +79,11 @@ func NewCompactionTriggerManager(alloc allocator, handler Handler, compactionHan
|
|||
allocator: alloc,
|
||||
handler: handler,
|
||||
compactionHandler: compactionHandler,
|
||||
meta: meta,
|
||||
closeSig: make(chan struct{}),
|
||||
view: &FullViews{
|
||||
collections: make(map[int64][]*SegmentView),
|
||||
},
|
||||
meta: meta,
|
||||
closeSig: make(chan struct{}),
|
||||
}
|
||||
m.l0Policy = newL0CompactionPolicy(meta)
|
||||
m.clusteringPolicy = newClusteringCompactionPolicy(meta, m.allocator, m.handler)
|
||||
|
@ -101,12 +91,6 @@ func NewCompactionTriggerManager(alloc allocator, handler Handler, compactionHan
|
|||
return m
|
||||
}
|
||||
|
||||
// OnCollectionUpdate notifies L0Policy about latest collection's L0 segment changes
|
||||
// This tells the l0 triggers about which collections are active
|
||||
func (m *CompactionTriggerManager) OnCollectionUpdate(collectionID int64) {
|
||||
m.l0Policy.OnCollectionUpdate(collectionID)
|
||||
}
|
||||
|
||||
func (m *CompactionTriggerManager) Start() {
|
||||
m.closeWg.Add(1)
|
||||
go m.startLoop()
|
||||
|
@ -211,27 +195,47 @@ func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collection
|
|||
}
|
||||
|
||||
func (m *CompactionTriggerManager) notify(ctx context.Context, eventType CompactionTriggerType, views []CompactionView) {
|
||||
log := log.Ctx(ctx)
|
||||
log.Debug("Start to trigger compactions", zap.String("eventType", eventType.String()))
|
||||
for _, view := range views {
|
||||
outView, reason := view.Trigger()
|
||||
if outView == nil && eventType == TriggerTypeLevelZeroViewIDLE {
|
||||
log.Info("Start to force trigger a level zero compaction")
|
||||
outView, reason = view.ForceTrigger()
|
||||
}
|
||||
|
||||
if outView != nil {
|
||||
log.Info("Success to trigger a compaction, try to submit",
|
||||
zap.String("eventType", eventType.String()),
|
||||
zap.String("reason", reason),
|
||||
zap.String("output view", outView.String()))
|
||||
|
||||
switch eventType {
|
||||
case TriggerTypeLevelZeroViewChange, TriggerTypeLevelZeroViewIDLE:
|
||||
switch eventType {
|
||||
case TriggerTypeLevelZeroViewChange:
|
||||
log.Debug("Start to trigger a level zero compaction by TriggerTypeLevelZeroViewChange")
|
||||
outView, reason := view.Trigger()
|
||||
if outView != nil {
|
||||
log.Info("Success to trigger a LevelZeroCompaction output view, try to submit",
|
||||
zap.String("reason", reason),
|
||||
zap.String("output view", outView.String()))
|
||||
m.SubmitL0ViewToScheduler(ctx, outView)
|
||||
case TriggerTypeClustering:
|
||||
}
|
||||
case TriggerTypeLevelZeroViewIDLE:
|
||||
log.Debug("Start to trigger a level zero compaction by TriggerTypLevelZeroViewIDLE")
|
||||
outView, reason := view.Trigger()
|
||||
if outView == nil {
|
||||
log.Info("Start to force trigger a level zero compaction by TriggerTypLevelZeroViewIDLE")
|
||||
outView, reason = view.ForceTrigger()
|
||||
}
|
||||
|
||||
if outView != nil {
|
||||
log.Info("Success to trigger a LevelZeroCompaction output view, try to submit",
|
||||
zap.String("reason", reason),
|
||||
zap.String("output view", outView.String()))
|
||||
m.SubmitL0ViewToScheduler(ctx, outView)
|
||||
}
|
||||
case TriggerTypeClustering:
|
||||
log.Debug("Start to trigger a clustering compaction by TriggerTypeClustering")
|
||||
outView, reason := view.Trigger()
|
||||
if outView != nil {
|
||||
log.Info("Success to trigger a ClusteringCompaction output view, try to submit",
|
||||
zap.String("reason", reason),
|
||||
zap.String("output view", outView.String()))
|
||||
m.SubmitClusteringViewToScheduler(ctx, outView)
|
||||
case TriggerTypeSingle:
|
||||
}
|
||||
case TriggerTypeSingle:
|
||||
log.Debug("Start to trigger a single compaction by TriggerTypeSingle")
|
||||
outView, reason := view.Trigger()
|
||||
if outView != nil {
|
||||
log.Info("Success to trigger a L2SingleCompaction output view, try to submit",
|
||||
zap.String("reason", reason),
|
||||
zap.String("output view", outView.String()))
|
||||
m.SubmitSingleViewToScheduler(ctx, outView)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,9 +75,10 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() {
|
|||
expectedSegID := seg1.ID
|
||||
|
||||
s.Require().Equal(1, len(latestL0Segments))
|
||||
levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments)
|
||||
s.Require().Equal(1, len(levelZeroViews))
|
||||
cView, ok := levelZeroViews[0].(*LevelZeroSegmentsView)
|
||||
needRefresh, levelZeroView := s.triggerManager.l0Policy.getChangedLevelZeroViews(1, latestL0Segments)
|
||||
s.True(needRefresh)
|
||||
s.Require().Equal(1, len(levelZeroView))
|
||||
cView, ok := levelZeroView[0].(*LevelZeroSegmentsView)
|
||||
s.True(ok)
|
||||
s.NotNil(cView)
|
||||
log.Info("view", zap.Any("cView", cView))
|
||||
|
@ -100,7 +101,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() {
|
|||
return nil
|
||||
}).Return(nil).Once()
|
||||
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(19530, nil).Maybe()
|
||||
s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewIDLE, levelZeroViews)
|
||||
s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewIDLE, levelZeroView)
|
||||
}
|
||||
|
||||
func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() {
|
||||
|
@ -118,9 +119,10 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() {
|
|||
|
||||
latestL0Segments := GetViewsByInfo(levelZeroSegments...)
|
||||
s.Require().NotEmpty(latestL0Segments)
|
||||
levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments)
|
||||
s.Require().Equal(1, len(levelZeroViews))
|
||||
cView, ok := levelZeroViews[0].(*LevelZeroSegmentsView)
|
||||
needRefresh, levelZeroView := s.triggerManager.l0Policy.getChangedLevelZeroViews(1, latestL0Segments)
|
||||
s.Require().True(needRefresh)
|
||||
s.Require().Equal(1, len(levelZeroView))
|
||||
cView, ok := levelZeroView[0].(*LevelZeroSegmentsView)
|
||||
s.True(ok)
|
||||
s.NotNil(cView)
|
||||
log.Info("view", zap.Any("cView", cView))
|
||||
|
@ -129,6 +131,8 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() {
|
|||
s.mockPlanContext.EXPECT().enqueueCompaction(mock.Anything).
|
||||
RunAndReturn(func(task *datapb.CompactionTask) error {
|
||||
s.EqualValues(19530, task.GetTriggerID())
|
||||
// s.True(signal.isGlobal)
|
||||
// s.False(signal.isForce)
|
||||
s.EqualValues(30000, task.GetPos().GetTimestamp())
|
||||
s.Equal(s.testLabel.CollectionID, task.GetCollectionID())
|
||||
s.Equal(s.testLabel.PartitionID, task.GetPartitionID())
|
||||
|
@ -140,7 +144,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() {
|
|||
return nil
|
||||
}).Return(nil).Once()
|
||||
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(19530, nil).Maybe()
|
||||
s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroViews)
|
||||
s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroView)
|
||||
}
|
||||
|
||||
func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() {
|
||||
|
|
|
@ -541,8 +541,6 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
|
|||
|
||||
if req.GetSegLevel() == datapb.SegmentLevel_L0 {
|
||||
metrics.DataCoordSizeStoredL0Segment.WithLabelValues(fmt.Sprint(req.GetCollectionID())).Observe(calculateL0SegmentSize(req.GetField2StatslogPaths()))
|
||||
|
||||
s.compactionTriggerManager.OnCollectionUpdate(req.GetCollectionID())
|
||||
return merr.Success(), nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue