fix: [2.5] Add and use lifetime context for compaction trigger (#39857) (#39880)

Cherry-pick from master
pr: #39857 
Related to #39856

This PR add lifetime bound context for compaction trigger and use it
instead of context.Background in case of rootcoord down and some grpc
call retry forever

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/39902/head
congqixia 2025-02-14 14:18:14 +08:00 committed by GitHub
parent 418f971d2d
commit 5da9262f58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 22 additions and 22 deletions

View File

@ -49,9 +49,8 @@ func (policy *clusteringCompactionPolicy) Enable() bool {
Params.DataCoordCfg.ClusteringCompactionAutoEnable.GetAsBool()
}
func (policy *clusteringCompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) {
func (policy *clusteringCompactionPolicy) Trigger(ctx context.Context) (map[CompactionTriggerType][]CompactionView, error) {
log.Info("start trigger clusteringCompactionPolicy...")
ctx := context.Background()
collections := policy.meta.GetCollections()
events := make(map[CompactionTriggerType][]CompactionView, 0)

View File

@ -99,7 +99,7 @@ func (s *ClusteringCompactionPolicySuite) TestEnable() {
func (s *ClusteringCompactionPolicySuite) TestTriggerWithNoCollecitons() {
// trigger with no collections
events, err := s.clusteringCompactionPolicy.Trigger()
events, err := s.clusteringCompactionPolicy.Trigger(context.Background())
s.NoError(err)
gotViews, ok := events[TriggerTypeClustering]
s.True(ok)
@ -132,7 +132,7 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerWithCollections() {
})
// trigger
events, err := s.clusteringCompactionPolicy.Trigger()
events, err := s.clusteringCompactionPolicy.Trigger(context.Background())
s.NoError(err)
gotViews, ok := events[TriggerTypeClustering]
s.True(ok)

View File

@ -46,8 +46,7 @@ func (policy *singleCompactionPolicy) Enable() bool {
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool()
}
func (policy *singleCompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) {
ctx := context.Background()
func (policy *singleCompactionPolicy) Trigger(ctx context.Context) (map[CompactionTriggerType][]CompactionView, error) {
collections := policy.meta.GetCollections()
events := make(map[CompactionTriggerType][]CompactionView, 0)

View File

@ -65,7 +65,7 @@ func (s *SingleCompactionPolicySuite) SetupTest() {
}
func (s *SingleCompactionPolicySuite) TestTrigger() {
events, err := s.singlePolicy.Trigger()
events, err := s.singlePolicy.Trigger(context.Background())
s.NoError(err)
gotViews, ok := events[TriggerTypeSingle]
s.True(ok)

View File

@ -86,8 +86,8 @@ type CompactionTriggerManager struct {
clusteringPolicy *clusteringCompactionPolicy
singlePolicy *singleCompactionPolicy
closeSig chan struct{}
closeWg sync.WaitGroup
cancel context.CancelFunc
closeWg sync.WaitGroup
}
func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, compactionHandler compactionPlanContext, meta *meta) *CompactionTriggerManager {
@ -96,7 +96,6 @@ func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, com
handler: handler,
compactionHandler: compactionHandler,
meta: meta,
closeSig: make(chan struct{}),
}
m.l0Policy = newL0CompactionPolicy(meta)
m.clusteringPolicy = newClusteringCompactionPolicy(meta, m.allocator, m.handler)
@ -112,19 +111,25 @@ func (m *CompactionTriggerManager) OnCollectionUpdate(collectionID int64) {
func (m *CompactionTriggerManager) Start() {
m.closeWg.Add(1)
go m.startLoop()
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel
go func() {
defer m.closeWg.Done()
m.loop(ctx)
}()
}
func (m *CompactionTriggerManager) Stop() {
close(m.closeSig)
if m.cancel != nil {
m.cancel()
}
m.closeWg.Wait()
}
func (m *CompactionTriggerManager) startLoop() {
func (m *CompactionTriggerManager) loop(ctx context.Context) {
defer logutil.LogPanic()
defer m.closeWg.Done()
log := log.Ctx(context.TODO())
log := log.Ctx(ctx)
l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second))
defer l0Ticker.Stop()
clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second))
@ -134,7 +139,7 @@ func (m *CompactionTriggerManager) startLoop() {
log.Info("Compaction trigger manager start")
for {
select {
case <-m.closeSig:
case <-ctx.Done():
log.Info("Compaction trigger manager checkLoop quit")
return
case <-l0Ticker.C:
@ -150,7 +155,6 @@ func (m *CompactionTriggerManager) startLoop() {
log.Warn("Fail to trigger L0 policy", zap.Error(err))
continue
}
ctx := context.Background()
if len(events) > 0 {
for triggerType, views := range events {
m.notify(ctx, triggerType, views)
@ -164,12 +168,11 @@ func (m *CompactionTriggerManager) startLoop() {
log.RatedInfo(10, "Skip trigger clustering compaction since compactionHandler is full")
continue
}
events, err := m.clusteringPolicy.Trigger()
events, err := m.clusteringPolicy.Trigger(ctx)
if err != nil {
log.Warn("Fail to trigger clustering policy", zap.Error(err))
continue
}
ctx := context.Background()
if len(events) > 0 {
for triggerType, views := range events {
m.notify(ctx, triggerType, views)
@ -183,12 +186,11 @@ func (m *CompactionTriggerManager) startLoop() {
log.RatedInfo(10, "Skip trigger single compaction since compactionHandler is full")
continue
}
events, err := m.singlePolicy.Trigger()
events, err := m.singlePolicy.Trigger(ctx)
if err != nil {
log.Warn("Fail to trigger single policy", zap.Error(err))
continue
}
ctx := context.Background()
if len(events) > 0 {
for triggerType, views := range events {
m.notify(ctx, triggerType, views)
@ -200,7 +202,7 @@ func (m *CompactionTriggerManager) startLoop() {
func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error) {
log.Ctx(ctx).Info("receive manual trigger", zap.Int64("collectionID", collectionID))
views, triggerID, err := m.clusteringPolicy.triggerOneCollection(context.Background(), collectionID, true)
views, triggerID, err := m.clusteringPolicy.triggerOneCollection(ctx, collectionID, true)
if err != nil {
return 0, err
}