fix: Use lock and map instead of concurrentMap (#31212)

See also: #31209

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/31256/head
XuanYang-cn 2024-03-14 18:39:04 +08:00 committed by GitHub
parent 773c64ecbb
commit a52a52064d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 58 additions and 21 deletions

View File

@ -88,8 +88,9 @@ type DataNode struct {
Role string
stateCode atomic.Value // commonpb.StateCode_Initializing
flowgraphManager FlowgraphManager
eventManagerMap *typeutil.ConcurrentMap[string, *channelEventManager]
channelManager ChannelManager
eventManager *EventManager
channelManager ChannelManager
syncMgr syncmgr.SyncManager
writeBufferManager writebuffer.BufferManager
@ -142,7 +143,7 @@ func NewDataNode(ctx context.Context, factory dependency.Factory, serverID int64
segmentCache: newCache(),
compactionExecutor: newCompactionExecutor(),
eventManagerMap: typeutil.NewConcurrentMap[string, *channelEventManager](),
eventManager: NewEventManager(),
flowgraphManager: newFlowgraphManager(),
clearSignal: make(chan string, 100),
@ -428,10 +429,7 @@ func (node *DataNode) Stop() error {
// Delay the cancellation of ctx to ensure that the session is automatically recycled after closed the flow graph
node.cancel()
node.eventManagerMap.Range(func(_ string, m *channelEventManager) bool {
m.Close()
return true
})
node.eventManager.CloseAll()
if node.writeBufferManager != nil {
node.writeBufferManager.Stop()

View File

@ -130,21 +130,15 @@ func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
log.Info("DataNode is handling watchInfo DELETE event", zap.String("key", key))
}
actualManager, loaded := node.eventManagerMap.GetOrInsert(e.vChanName, newChannelEventManager(
actualManager := node.eventManager.GetOrInsert(e.vChanName, newChannelEventManager(
node.handlePutEvent, node.handleDeleteEvent, retryWatchInterval,
))
if !loaded {
actualManager.Run()
}
actualManager.handleEvent(*e)
// Whenever a delete event comes, this eventManager will be removed from map
if e.eventType == deleteEventType {
if m, loaded := node.eventManagerMap.GetAndRemove(e.vChanName); loaded {
m.Close()
}
node.eventManager.Remove(e.vChanName)
}
}
@ -402,3 +396,49 @@ func GetLiteChannelWatchInfo(watchInfo *datapb.ChannelWatchInfo) *datapb.Channel
Progress: watchInfo.GetProgress(),
}
}
type EventManager struct {
channelGuard sync.Mutex
channelManagers map[string]*channelEventManager
}
func NewEventManager() *EventManager {
return &EventManager{
channelManagers: make(map[string]*channelEventManager),
}
}
func (m *EventManager) GetOrInsert(channel string, newManager *channelEventManager) *channelEventManager {
m.channelGuard.Lock()
defer m.channelGuard.Unlock()
eManager, got := m.channelManagers[channel]
if !got {
newManager.Run()
m.channelManagers[channel] = newManager
return newManager
}
return eManager
}
func (m *EventManager) Remove(channel string) {
m.channelGuard.Lock()
eManager, got := m.channelManagers[channel]
delete(m.channelManagers, channel)
m.channelGuard.Unlock()
if got {
eManager.Close()
}
}
func (m *EventManager) CloseAll() {
m.channelGuard.Lock()
defer m.channelGuard.Unlock()
for channel, eManager := range m.channelManagers {
delete(m.channelManagers, channel)
eManager.Close()
}
}

View File

@ -218,9 +218,9 @@ func TestWatchChannel(t *testing.T) {
chDel <- struct{}{}
}, time.Millisecond*100,
)
node.eventManagerMap.Insert(ch, m)
m.Run()
defer m.Close()
node.eventManager.GetOrInsert(ch, m)
defer node.eventManager.Remove(ch)
info = datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{ChannelName: ch},
@ -258,9 +258,8 @@ func TestWatchChannel(t *testing.T) {
chDel <- struct{}{}
}, time.Millisecond*100,
)
node.eventManagerMap.Insert(ch, m)
m.Run()
defer m.Close()
node.eventManager.GetOrInsert(ch, m)
defer node.eventManager.Remove(ch)
e := &event{
eventType: putEventType,
version: 10000,