From a52a52064ddb8d3eeb31097e34e56d420601daf2 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 14 Mar 2024 18:39:04 +0800 Subject: [PATCH] fix: Use lock and map instead of concurrentMap (#31212) See also: #31209 --------- Signed-off-by: yangxuan --- internal/datanode/data_node.go | 12 +++--- internal/datanode/event_manager.go | 56 +++++++++++++++++++++---- internal/datanode/event_manager_test.go | 11 +++-- 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index a618e39c32..2654fdaeb9 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -88,8 +88,9 @@ type DataNode struct { Role string stateCode atomic.Value // commonpb.StateCode_Initializing flowgraphManager FlowgraphManager - eventManagerMap *typeutil.ConcurrentMap[string, *channelEventManager] - channelManager ChannelManager + + eventManager *EventManager + channelManager ChannelManager syncMgr syncmgr.SyncManager writeBufferManager writebuffer.BufferManager @@ -142,7 +143,7 @@ func NewDataNode(ctx context.Context, factory dependency.Factory, serverID int64 segmentCache: newCache(), compactionExecutor: newCompactionExecutor(), - eventManagerMap: typeutil.NewConcurrentMap[string, *channelEventManager](), + eventManager: NewEventManager(), flowgraphManager: newFlowgraphManager(), clearSignal: make(chan string, 100), @@ -428,10 +429,7 @@ func (node *DataNode) Stop() error { // Delay the cancellation of ctx to ensure that the session is automatically recycled after closed the flow graph node.cancel() - node.eventManagerMap.Range(func(_ string, m *channelEventManager) bool { - m.Close() - return true - }) + node.eventManager.CloseAll() if node.writeBufferManager != nil { node.writeBufferManager.Stop() diff --git a/internal/datanode/event_manager.go b/internal/datanode/event_manager.go index 93479774e6..5fda6f0f2c 100644 --- a/internal/datanode/event_manager.go +++ b/internal/datanode/event_manager.go @@ -130,21 +130,15 @@ func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) { log.Info("DataNode is handling watchInfo DELETE event", zap.String("key", key)) } - actualManager, loaded := node.eventManagerMap.GetOrInsert(e.vChanName, newChannelEventManager( + actualManager := node.eventManager.GetOrInsert(e.vChanName, newChannelEventManager( node.handlePutEvent, node.handleDeleteEvent, retryWatchInterval, )) - if !loaded { - actualManager.Run() - } - actualManager.handleEvent(*e) // Whenever a delete event comes, this eventManager will be removed from map if e.eventType == deleteEventType { - if m, loaded := node.eventManagerMap.GetAndRemove(e.vChanName); loaded { - m.Close() - } + node.eventManager.Remove(e.vChanName) } } @@ -402,3 +396,49 @@ func GetLiteChannelWatchInfo(watchInfo *datapb.ChannelWatchInfo) *datapb.Channel Progress: watchInfo.GetProgress(), } } + +type EventManager struct { + channelGuard sync.Mutex + channelManagers map[string]*channelEventManager +} + +func NewEventManager() *EventManager { + return &EventManager{ + channelManagers: make(map[string]*channelEventManager), + } +} + +func (m *EventManager) GetOrInsert(channel string, newManager *channelEventManager) *channelEventManager { + m.channelGuard.Lock() + defer m.channelGuard.Unlock() + + eManager, got := m.channelManagers[channel] + if !got { + newManager.Run() + m.channelManagers[channel] = newManager + return newManager + } + + return eManager +} + +func (m *EventManager) Remove(channel string) { + m.channelGuard.Lock() + eManager, got := m.channelManagers[channel] + delete(m.channelManagers, channel) + m.channelGuard.Unlock() + + if got { + eManager.Close() + } +} + +func (m *EventManager) CloseAll() { + m.channelGuard.Lock() + defer m.channelGuard.Unlock() + + for channel, eManager := range m.channelManagers { + delete(m.channelManagers, channel) + eManager.Close() + } +} diff --git a/internal/datanode/event_manager_test.go b/internal/datanode/event_manager_test.go index 24e5ed5037..02b2010407 100644 --- a/internal/datanode/event_manager_test.go +++ b/internal/datanode/event_manager_test.go @@ -218,9 +218,9 @@ func TestWatchChannel(t *testing.T) { chDel <- struct{}{} }, time.Millisecond*100, ) - node.eventManagerMap.Insert(ch, m) - m.Run() - defer m.Close() + + node.eventManager.GetOrInsert(ch, m) + defer node.eventManager.Remove(ch) info = datapb.ChannelWatchInfo{ Vchan: &datapb.VchannelInfo{ChannelName: ch}, @@ -258,9 +258,8 @@ func TestWatchChannel(t *testing.T) { chDel <- struct{}{} }, time.Millisecond*100, ) - node.eventManagerMap.Insert(ch, m) - m.Run() - defer m.Close() + node.eventManager.GetOrInsert(ch, m) + defer node.eventManager.Remove(ch) e := &event{ eventType: putEventType, version: 10000,