fix: [10kcp] Replace outer lock with concurrent map (#38286)

See also: #37493
pr: #37817

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
Co-authored-by: XuanYang-cn <xuan.yang@zilliz.com>
pull/38312/head
yihao.dai 2024-12-09 19:49:20 +08:00 committed by GitHub
parent df100e5bbe
commit 3d490aa158
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 46 additions and 96 deletions

View File

@ -16,6 +16,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// BufferManager is the interface for WriteBuffer management.
@ -49,7 +50,7 @@ type BufferManager interface {
func NewManager(syncMgr syncmgr.SyncManager) BufferManager {
return &bufferManager{
syncMgr: syncMgr,
buffers: make(map[string]WriteBuffer),
buffers: typeutil.NewConcurrentMap[string, WriteBuffer](),
ch: lifetime.NewSafeChan(),
}
@ -57,8 +58,7 @@ func NewManager(syncMgr syncmgr.SyncManager) BufferManager {
type bufferManager struct {
syncMgr syncmgr.SyncManager
buffers map[string]WriteBuffer
mut sync.RWMutex
buffers *typeutil.ConcurrentMap[string, WriteBuffer]
wg sync.WaitGroup
ch lifetime.SafeChan
@ -93,13 +93,11 @@ func (m *bufferManager) memoryCheck() {
return
}
startTime := time.Now()
m.mut.RLock()
defer func() {
dur := time.Since(startTime)
if dur > 30*time.Second {
log.Warn("memory check takes too long", zap.Duration("time", dur))
}
m.mut.RUnlock()
}()
for {
@ -112,7 +110,7 @@ func (m *bufferManager) memoryCheck() {
return mem / 1024 / 1024
}
for chanName, buf := range m.buffers {
m.buffers.Range(func(chanName string, buf WriteBuffer) bool {
size := buf.MemorySize()
total += size
if size > candiSize {
@ -120,7 +118,8 @@ func (m *bufferManager) memoryCheck() {
candidate = buf
candiChan = chanName
}
}
return true
})
totalMemory := hardware.GetMemoryCount()
memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryForceSyncWatermark.GetAsFloat()
@ -146,28 +145,23 @@ func (m *bufferManager) Stop() {
// Register a new WriteBuffer for channel.
func (m *bufferManager) Register(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, opts ...WriteBufferOption) error {
m.mut.Lock()
defer m.mut.Unlock()
_, ok := m.buffers[channel]
if ok {
return merr.WrapErrChannelReduplicate(channel)
}
buf, err := NewWriteBuffer(channel, metacache, storageV2Cache, m.syncMgr, opts...)
if err != nil {
return err
}
m.buffers[channel] = buf
_, loaded := m.buffers.GetOrInsert(channel, buf)
if loaded {
buf.Close(false)
return merr.WrapErrChannelReduplicate(channel)
}
return nil
}
// SealSegments call sync segment and change segments state to Flushed.
func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()
if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
log.Ctx(ctx).Warn("write buffer not found when flush segments",
zap.String("channel", channel),
zap.Int64s("segmentIDs", segmentIDs))
@ -178,11 +172,8 @@ func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmen
}
func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()
if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
log.Ctx(ctx).Warn("write buffer not found when flush channel",
zap.String("channel", channel),
zap.Uint64("flushTs", flushTs))
@ -194,11 +185,8 @@ func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushT
// BufferData put data into channel write buffer.
func (m *bufferManager) BufferData(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()
if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
log.Ctx(context.Background()).Warn("write buffer not found when buffer data",
zap.String("channel", channel))
return merr.WrapErrChannelNotFound(channel)
@ -209,11 +197,8 @@ func (m *bufferManager) BufferData(channel string, insertMsgs []*msgstream.Inser
// GetCheckpoint returns checkpoint for provided channel.
func (m *bufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()
if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
return nil, false, merr.WrapErrChannelNotFound(channel)
}
cp := buf.GetCheckpoint()
@ -223,10 +208,8 @@ func (m *bufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool,
}
func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) {
m.mut.Lock()
defer m.mut.Unlock()
buf, ok := m.buffers[channel]
if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
return
}
flushTs := buf.GetFlushTimestamp()
@ -239,12 +222,8 @@ func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) {
// RemoveChannel remove channel WriteBuffer from manager.
// this method discards all buffered data since datanode no longer has the ownership
func (m *bufferManager) RemoveChannel(channel string) {
m.mut.Lock()
buf, ok := m.buffers[channel]
delete(m.buffers, channel)
m.mut.Unlock()
if !ok {
buf, loaded := m.buffers.GetAndRemove(channel)
if !loaded {
log.Warn("failed to remove channel, channel not maintained in manager", zap.String("channel", channel))
return
}
@ -255,12 +234,8 @@ func (m *bufferManager) RemoveChannel(channel string) {
// DropChannel removes channel WriteBuffer and process `DropChannel`
// this method will save all buffered data
func (m *bufferManager) DropChannel(channel string) {
m.mut.Lock()
buf, ok := m.buffers[channel]
delete(m.buffers, channel)
m.mut.Unlock()
if !ok {
buf, loaded := m.buffers.GetAndRemove(channel)
if !loaded {
log.Warn("failed to drop channel, channel not maintained in manager", zap.String("channel", channel))
return
}
@ -269,11 +244,8 @@ func (m *bufferManager) DropChannel(channel string) {
}
func (m *bufferManager) DropPartitions(channel string, partitionIDs []int64) {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()
if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
log.Warn("failed to drop partition, channel not maintained in manager", zap.String("channel", channel), zap.Int64s("partitionIDs", partitionIDs))
return
}

View File

@ -98,10 +98,7 @@ func (s *ManagerSuite) TestFlushSegments() {
defer cancel()
wb := NewMockWriteBuffer(s.T())
s.manager.mut.Lock()
s.manager.buffers[s.channelName] = wb
s.manager.mut.Unlock()
s.manager.buffers.Insert(s.channelName, wb)
wb.EXPECT().SealSegments(mock.Anything, mock.Anything).Return(nil)
@ -120,10 +117,7 @@ func (s *ManagerSuite) TestBufferData() {
s.Run("normal_buffer_data", func() {
wb := NewMockWriteBuffer(s.T())
s.manager.mut.Lock()
s.manager.buffers[s.channelName] = wb
s.manager.mut.Unlock()
s.manager.buffers.Insert(s.channelName, wb)
wb.EXPECT().BufferData(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err := manager.BufferData(s.channelName, nil, nil, nil, nil)
@ -141,10 +135,7 @@ func (s *ManagerSuite) TestGetCheckpoint() {
s.Run("normal_checkpoint", func() {
wb := NewMockWriteBuffer(s.T())
manager.mut.Lock()
manager.buffers[s.channelName] = wb
manager.mut.Unlock()
manager.buffers.Insert(s.channelName, wb)
pos := &msgpb.MsgPosition{ChannelName: s.channelName, Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0)}
wb.EXPECT().GetCheckpoint().Return(pos)
wb.EXPECT().GetFlushTimestamp().Return(nonFlushTS)
@ -157,10 +148,7 @@ func (s *ManagerSuite) TestGetCheckpoint() {
s.Run("checkpoint_need_update", func() {
wb := NewMockWriteBuffer(s.T())
manager.mut.Lock()
manager.buffers[s.channelName] = wb
manager.mut.Unlock()
manager.buffers.Insert(s.channelName, wb)
cpTimestamp := tsoutil.ComposeTSByTime(time.Now(), 0)
pos := &msgpb.MsgPosition{ChannelName: s.channelName, Timestamp: cpTimestamp}
@ -207,10 +195,7 @@ func (s *ManagerSuite) TestDropPartitions() {
wb := NewMockWriteBuffer(s.T())
wb.EXPECT().DropPartitions(mock.Anything).Return()
manager.mut.Lock()
manager.buffers[s.channelName] = wb
manager.mut.Unlock()
manager.buffers.Insert(s.channelName, wb)
manager.DropPartitions(s.channelName, []int64{1})
})
}
@ -248,10 +233,7 @@ func (s *ManagerSuite) TestMemoryCheck() {
}
flag.Store(true)
}).Return()
manager.mut.Lock()
manager.buffers[s.channelName] = wb
manager.mut.Unlock()
manager.buffers.Insert(s.channelName, wb)
manager.Start()
defer manager.Stop()

View File

@ -65,38 +65,35 @@ type checkpointCandidate struct {
}
type checkpointCandidates struct {
candidates map[string]*checkpointCandidate
mu sync.RWMutex
candidates *typeutil.ConcurrentMap[string, *checkpointCandidate]
}
func getCandidatesKey(segmentID int64, timestamp uint64) string {
return fmt.Sprintf("%d-%d", segmentID, timestamp)
}
func newCheckpointCandiates() *checkpointCandidates {
return &checkpointCandidates{
candidates: make(map[string]*checkpointCandidate),
candidates: typeutil.NewConcurrentMap[string, *checkpointCandidate](), // segmentID-ts
}
}
func (c *checkpointCandidates) Remove(segmentID int64, timestamp uint64) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.candidates, fmt.Sprintf("%d-%d", segmentID, timestamp))
c.candidates.Remove(getCandidatesKey(segmentID, timestamp))
}
func (c *checkpointCandidates) Add(segmentID int64, position *msgpb.MsgPosition, source string) {
c.mu.Lock()
defer c.mu.Unlock()
c.candidates[fmt.Sprintf("%d-%d", segmentID, position.GetTimestamp())] = &checkpointCandidate{segmentID, position, source}
c.candidates.Insert(getCandidatesKey(segmentID, position.GetTimestamp()), &checkpointCandidate{segmentID, position, source})
}
func (c *checkpointCandidates) GetEarliestWithDefault(def *checkpointCandidate) *checkpointCandidate {
c.mu.RLock()
defer c.mu.RUnlock()
var result *checkpointCandidate = def
for _, candidate := range c.candidates {
c.candidates.Range(func(_ string, candidate *checkpointCandidate) bool {
if result == nil || candidate.position.GetTimestamp() < result.position.GetTimestamp() {
result = candidate
}
}
return true
})
return result
}
@ -111,8 +108,6 @@ func NewWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cach
// writeBufferBase is the common component for buffering data
type writeBufferBase struct {
mut sync.RWMutex
collectionID int64
channelName string
@ -123,6 +118,7 @@ type writeBufferBase struct {
estSizePerRecord int
metaCache metacache.MetaCache
mut sync.RWMutex
buffers map[int64]*segmentBuffer // segmentID => segmentBuffer
syncPolicies []SyncPolicy