mirror of https://github.com/milvus-io/milvus.git
Fix event manager not closed after datanode stop (#24206)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/24211/head
parent
6865f93ad3
commit
03297a7ef7
|
@ -103,7 +103,7 @@ type DataNode struct {
|
|||
Role string
|
||||
stateCode atomic.Value // commonpb.StateCode_Initializing
|
||||
flowgraphManager *flowgraphManager
|
||||
eventManagerMap sync.Map // vchannel name -> channelEventManager
|
||||
eventManagerMap *typeutil.ConcurrentMap[string, *channelEventManager]
|
||||
|
||||
clearSignal chan string // vchannel name
|
||||
segmentCache *Cache
|
||||
|
@ -117,6 +117,7 @@ type DataNode struct {
|
|||
//call once
|
||||
initOnce sync.Once
|
||||
startOnce sync.Once
|
||||
wg sync.WaitGroup
|
||||
sessionMu sync.Mutex // to fix data race
|
||||
session *sessionutil.Session
|
||||
watchKv kv.MetaKv
|
||||
|
@ -144,6 +145,7 @@ func NewDataNode(ctx context.Context, factory dependency.Factory) *DataNode {
|
|||
segmentCache: newCache(),
|
||||
compactionExecutor: newCompactionExecutor(),
|
||||
|
||||
eventManagerMap: typeutil.NewConcurrentMap[string, *channelEventManager](),
|
||||
flowgraphManager: newFlowgraphManager(),
|
||||
clearSignal: make(chan string, 100),
|
||||
}
|
||||
|
@ -270,6 +272,7 @@ func (node *DataNode) Init() error {
|
|||
|
||||
// StartWatchChannels start loop to watch channel allocation status via kv(etcd for now)
|
||||
func (node *DataNode) StartWatchChannels(ctx context.Context) {
|
||||
defer node.wg.Done()
|
||||
defer logutil.LogPanic()
|
||||
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
|
||||
// TODO, this is risky, we'd better watch etcd with revision rather simply a path
|
||||
|
@ -374,19 +377,20 @@ func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
|
|||
log.Info("DataNode is handling watchInfo DELETE event", zap.String("key", key))
|
||||
}
|
||||
|
||||
actualManager, loaded := node.eventManagerMap.LoadOrStore(e.vChanName, newChannelEventManager(
|
||||
actualManager, loaded := node.eventManagerMap.GetOrInsert(e.vChanName, newChannelEventManager(
|
||||
node.handlePutEvent, node.handleDeleteEvent, retryWatchInterval,
|
||||
))
|
||||
|
||||
if !loaded {
|
||||
actualManager.(*channelEventManager).Run()
|
||||
actualManager.Run()
|
||||
}
|
||||
|
||||
actualManager.(*channelEventManager).handleEvent(*e)
|
||||
actualManager.handleEvent(*e)
|
||||
|
||||
// Whenever a delete event comes, this eventManager will be removed from map
|
||||
if e.eventType == deleteEventType {
|
||||
if m, loaded := node.eventManagerMap.LoadAndDelete(e.vChanName); loaded {
|
||||
m.(*channelEventManager).Close()
|
||||
if m, loaded := node.eventManagerMap.GetAndRemove(e.vChanName); loaded {
|
||||
m.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -472,6 +476,7 @@ func (node *DataNode) tryToReleaseFlowgraph(vChanName string) {
|
|||
// BackGroundGC runs in background to release datanode resources
|
||||
// GOOSE TODO: remove background GC, using ToRelease for drop-collection after #15846
|
||||
func (node *DataNode) BackGroundGC(vChannelCh <-chan string) {
|
||||
defer node.wg.Done()
|
||||
log.Info("DataNode Background GC Start")
|
||||
for {
|
||||
select {
|
||||
|
@ -529,10 +534,12 @@ func (node *DataNode) Start() error {
|
|||
|
||||
node.chunkManager = chunkManager
|
||||
|
||||
node.wg.Add(1)
|
||||
go node.BackGroundGC(node.clearSignal)
|
||||
|
||||
go node.compactionExecutor.start(node.ctx)
|
||||
|
||||
node.wg.Add(1)
|
||||
// Start node watch node
|
||||
go node.StartWatchChannels(node.ctx)
|
||||
|
||||
|
@ -574,6 +581,12 @@ func (node *DataNode) Stop() error {
|
|||
node.flowgraphManager.stop()
|
||||
|
||||
node.cancel()
|
||||
node.wg.Wait()
|
||||
node.eventManagerMap.Range(func(_ string, m *channelEventManager) bool {
|
||||
m.Close()
|
||||
return true
|
||||
})
|
||||
|
||||
if node.allocator != nil {
|
||||
log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole))
|
||||
node.allocator.Close()
|
||||
|
|
|
@ -189,6 +189,7 @@ func TestDataNode(t *testing.T) {
|
|||
t.Run("Test BackGroundGC", func(t *testing.T) {
|
||||
vchanNameCh := make(chan string)
|
||||
node.clearSignal = vchanNameCh
|
||||
node.wg.Add(1)
|
||||
go node.BackGroundGC(vchanNameCh)
|
||||
|
||||
testDataSyncs := []struct {
|
||||
|
@ -385,7 +386,7 @@ func TestWatchChannel(t *testing.T) {
|
|||
chDel <- struct{}{}
|
||||
}, time.Millisecond*100,
|
||||
)
|
||||
node.eventManagerMap.Store(ch, m)
|
||||
node.eventManagerMap.Insert(ch, m)
|
||||
m.Run()
|
||||
defer m.Close()
|
||||
|
||||
|
@ -422,7 +423,7 @@ func TestWatchChannel(t *testing.T) {
|
|||
chDel <- struct{}{}
|
||||
}, time.Millisecond*100,
|
||||
)
|
||||
node.eventManagerMap.Store(ch, m)
|
||||
node.eventManagerMap.Insert(ch, m)
|
||||
m.Run()
|
||||
defer m.Close()
|
||||
e := &event{
|
||||
|
|
|
@ -40,6 +40,7 @@ type event struct {
|
|||
|
||||
type channelEventManager struct {
|
||||
sync.Once
|
||||
wg sync.WaitGroup
|
||||
eventChan chan event
|
||||
closeChan chan struct{}
|
||||
handlePutEvent func(watchInfo *datapb.ChannelWatchInfo, version int64) error // node.handlePutEvent
|
||||
|
@ -64,7 +65,9 @@ func newChannelEventManager(handlePut func(*datapb.ChannelWatchInfo, int64) erro
|
|||
}
|
||||
|
||||
func (e *channelEventManager) Run() {
|
||||
e.wg.Add(1)
|
||||
go func() {
|
||||
defer e.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case event := <-e.eventChan:
|
||||
|
@ -88,6 +91,7 @@ func (e *channelEventManager) handleEvent(event event) {
|
|||
func (e *channelEventManager) Close() {
|
||||
e.Do(func() {
|
||||
close(e.closeChan)
|
||||
e.wg.Wait()
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue