fix: Milvus panic when compaction disabled and dropping a collection (#34103)

See also: #31059

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/34618/head
XuanYang-cn 2024-07-11 14:44:52 +08:00 committed by GitHub
parent be92147723
commit e0b39d8bf4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 72 additions and 51 deletions

View File

@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/logutil" "github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
@ -77,8 +78,8 @@ type compactionTrigger struct {
compactionHandler compactionPlanContext compactionHandler compactionPlanContext
globalTrigger *time.Ticker globalTrigger *time.Ticker
forceMu lock.Mutex forceMu lock.Mutex
quit chan struct{} closeCh lifetime.SafeChan
wg sync.WaitGroup closeWaiter sync.WaitGroup
indexEngineVersionManager IndexEngineVersionManager indexEngineVersionManager IndexEngineVersionManager
@ -105,20 +106,20 @@ func newCompactionTrigger(
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy, estimateNonDiskSegmentPolicy: calBySchemaPolicy,
handler: handler, handler: handler,
closeCh: lifetime.NewSafeChan(),
} }
} }
func (t *compactionTrigger) start() { func (t *compactionTrigger) start() {
t.quit = make(chan struct{})
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
t.wg.Add(2) t.closeWaiter.Add(2)
go func() { go func() {
defer logutil.LogPanic() defer logutil.LogPanic()
defer t.wg.Done() defer t.closeWaiter.Done()
for { for {
select { select {
case <-t.quit: case <-t.closeCh.CloseCh():
log.Info("compaction trigger quit") log.Info("compaction trigger quit")
return return
case signal := <-t.signals: case signal := <-t.signals:
@ -145,7 +146,7 @@ func (t *compactionTrigger) start() {
func (t *compactionTrigger) startGlobalCompactionLoop() { func (t *compactionTrigger) startGlobalCompactionLoop() {
defer logutil.LogPanic() defer logutil.LogPanic()
defer t.wg.Done() defer t.closeWaiter.Done()
// If AutoCompaction disabled, global loop will not start // If AutoCompaction disabled, global loop will not start
if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() { if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
@ -154,7 +155,7 @@ func (t *compactionTrigger) startGlobalCompactionLoop() {
for { for {
select { select {
case <-t.quit: case <-t.closeCh.CloseCh():
t.globalTrigger.Stop() t.globalTrigger.Stop()
log.Info("global compaction loop exit") log.Info("global compaction loop exit")
return return
@ -168,8 +169,8 @@ func (t *compactionTrigger) startGlobalCompactionLoop() {
} }
func (t *compactionTrigger) stop() { func (t *compactionTrigger) stop() {
close(t.quit) t.closeCh.Close()
t.wg.Wait() t.closeWaiter.Wait()
} }
func (t *compactionTrigger) getCollection(collectionID UniqueID) (*collectionInfo, error) { func (t *compactionTrigger) getCollection(collectionID UniqueID) (*collectionInfo, error) {
@ -241,7 +242,7 @@ func (t *compactionTrigger) triggerCompaction() error {
// triggerSingleCompaction trigger a compaction bundled with collection-partition-channel-segment // triggerSingleCompaction trigger a compaction bundled with collection-partition-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error { func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error {
// If AutoCompaction disabled, flush request will not trigger compaction // If AutoCompaction disabled, flush request will not trigger compaction
if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() { if !paramtable.Get().DataCoordCfg.EnableAutoCompaction.GetAsBool() && !paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() {
return nil return nil
} }

View File

@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/tsoutil"
) )
@ -494,6 +495,7 @@ func Test_compactionTrigger_force(t *testing.T) {
globalTrigger: tt.fields.globalTrigger, globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy, estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true, testingOnly: true,
} }
_, err := tr.triggerManualCompaction(tt.collectionID) _, err := tr.triggerManualCompaction(tt.collectionID)
@ -519,6 +521,7 @@ func Test_compactionTrigger_force(t *testing.T) {
globalTrigger: tt.fields.globalTrigger, globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy, estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true, testingOnly: true,
} }
tt.collectionID = 1000 tt.collectionID = 1000
@ -543,6 +546,7 @@ func Test_compactionTrigger_force(t *testing.T) {
globalTrigger: tt.fields.globalTrigger, globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy, estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true, testingOnly: true,
} }
@ -781,6 +785,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
globalTrigger: tt.fields.globalTrigger, globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy, estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true, testingOnly: true,
} }
_, err := tr.triggerManualCompaction(tt.args.collectionID) _, err := tr.triggerManualCompaction(tt.args.collectionID)
@ -932,6 +937,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
globalTrigger: tt.fields.globalTrigger, globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy, estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true, testingOnly: true,
} }
tr.start() tr.start()
@ -1119,6 +1125,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
signals: tt.fields.signals, signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler, compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger, globalTrigger: tt.fields.globalTrigger,
closeCh: lifetime.NewSafeChan(),
testingOnly: true, testingOnly: true,
} }
tr.start() tr.start()
@ -1312,6 +1319,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) {
indexEngineVersionManager: newMockVersionManager(), indexEngineVersionManager: newMockVersionManager(),
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy, estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true, testingOnly: true,
} }
tr.start() tr.start()
@ -1501,6 +1509,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) {
indexEngineVersionManager: newMockVersionManager(), indexEngineVersionManager: newMockVersionManager(),
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy, estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true, testingOnly: true,
} }
tr.start() tr.start()
@ -1675,6 +1684,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
compactionHandler: tt.fields.compactionHandler, compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger, globalTrigger: tt.fields.globalTrigger,
indexEngineVersionManager: newMockVersionManager(), indexEngineVersionManager: newMockVersionManager(),
closeCh: lifetime.NewSafeChan(),
testingOnly: true, testingOnly: true,
} }
tr.start() tr.start()

View File

@ -45,6 +45,8 @@ type TriggerManager interface {
ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error)
} }
var _ TriggerManager = (*CompactionTriggerManager)(nil)
// CompactionTriggerManager registers Triggers to TriggerType // CompactionTriggerManager registers Triggers to TriggerType
// so that when the certain TriggerType happens, the corresponding triggers can // so that when the certain TriggerType happens, the corresponding triggers can
// trigger the correct compaction plans. // trigger the correct compaction plans.
@ -93,7 +95,7 @@ func (m *CompactionTriggerManager) Start() {
go m.startLoop() go m.startLoop()
} }
func (m *CompactionTriggerManager) Close() { func (m *CompactionTriggerManager) Stop() {
close(m.closeSig) close(m.closeSig)
m.closeWg.Wait() m.closeWg.Wait()
} }

View File

@ -122,7 +122,7 @@ type Server struct {
compactionTrigger trigger compactionTrigger trigger
compactionHandler compactionPlanContext compactionHandler compactionPlanContext
compactionTriggerManager *CompactionTriggerManager compactionTriggerManager TriggerManager
syncSegmentsScheduler *SyncSegmentsScheduler syncSegmentsScheduler *SyncSegmentsScheduler
metricsCacheManager *metricsinfo.MetricsCacheManager metricsCacheManager *metricsinfo.MetricsCacheManager
@ -352,11 +352,10 @@ func (s *Server) initDataCoord() error {
log.Info("init service discovery done") log.Info("init service discovery done")
s.initTaskScheduler(storageCli) s.initTaskScheduler(storageCli)
if Params.DataCoordCfg.EnableCompaction.GetAsBool() { log.Info("init task scheduler done")
s.createCompactionHandler()
s.createCompactionTrigger() s.initCompaction()
log.Info("init compaction scheduler done") log.Info("init compaction done")
}
if err = s.initSegmentManager(); err != nil { if err = s.initSegmentManager(); err != nil {
return err return err
@ -398,11 +397,6 @@ func (s *Server) Start() error {
func (s *Server) startDataCoord() { func (s *Server) startDataCoord() {
s.taskScheduler.Start() s.taskScheduler.Start()
if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.compactionHandler.start()
s.compactionTrigger.start()
s.compactionTriggerManager.Start()
}
s.startServerLoop() s.startServerLoop()
// http.Register(&http.Handler{ // http.Register(&http.Handler{
@ -497,24 +491,6 @@ func (s *Server) SetIndexNodeCreator(f func(context.Context, string, int64) (typ
s.indexNodeCreator = f s.indexNodeCreator = f
} }
func (s *Server) createCompactionHandler() {
s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.channelManager, s.meta, s.allocator, s.taskScheduler, s.handler)
s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta)
}
func (s *Server) stopCompactionHandler() {
s.compactionHandler.stop()
s.compactionTriggerManager.Close()
}
func (s *Server) createCompactionTrigger() {
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager)
}
func (s *Server) stopCompactionTrigger() {
s.compactionTrigger.stop()
}
func (s *Server) newChunkManagerFactory() (storage.ChunkManager, error) { func (s *Server) newChunkManagerFactory() (storage.ChunkManager, error) {
chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(Params) chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(Params)
cli, err := chunkManagerFactory.NewPersistentStorageChunkManager(s.ctx) cli, err := chunkManagerFactory.NewPersistentStorageChunkManager(s.ctx)
@ -673,7 +649,44 @@ func (s *Server) initIndexNodeManager() {
} }
} }
func (s *Server) initCompaction() {
s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.channelManager, s.meta, s.allocator, s.taskScheduler, s.handler)
s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta)
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager)
}
func (s *Server) stopCompaction() {
if s.compactionTrigger != nil {
s.compactionTrigger.stop()
}
if s.compactionTriggerManager != nil {
s.compactionTriggerManager.Stop()
}
if s.compactionHandler != nil {
s.compactionHandler.stop()
}
}
func (s *Server) startCompaction() {
if s.compactionHandler != nil {
s.compactionHandler.start()
}
if s.compactionTrigger != nil {
s.compactionTrigger.start()
}
if s.compactionTriggerManager != nil {
s.compactionTriggerManager.Start()
}
}
func (s *Server) startServerLoop() { func (s *Server) startServerLoop() {
if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.startCompaction()
}
s.serverLoopWg.Add(2) s.serverLoopWg.Add(2)
s.startWatchService(s.serverLoopCtx) s.startWatchService(s.serverLoopCtx)
s.startFlushLoop(s.serverLoopCtx) s.startFlushLoop(s.serverLoopCtx)
@ -1002,10 +1015,7 @@ func (s *Server) Stop() error {
s.importChecker.Close() s.importChecker.Close()
s.syncSegmentsScheduler.Stop() s.syncSegmentsScheduler.Stop()
if Params.DataCoordCfg.EnableCompaction.GetAsBool() { s.stopCompaction()
s.stopCompactionTrigger()
s.stopCompactionHandler()
}
logutil.Logger(s.ctx).Info("datacoord compaction stopped") logutil.Logger(s.ctx).Info("datacoord compaction stopped")
s.taskScheduler.Stop() s.taskScheduler.Stop()

View File

@ -549,12 +549,10 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
s.flushCh <- req.SegmentID s.flushCh <- req.SegmentID
// notify compaction // notify compaction
if paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() { err := s.compactionTrigger.triggerSingleCompaction(req.GetCollectionID(), req.GetPartitionID(),
err := s.compactionTrigger.triggerSingleCompaction(req.GetCollectionID(), req.GetPartitionID(), req.GetSegmentID(), req.GetChannel(), false)
req.GetSegmentID(), req.GetChannel(), false) if err != nil {
if err != nil { log.Warn("failed to trigger single compaction")
log.Warn("failed to trigger single compaction")
}
} }
} }