Fix mem leak of flush cache in DataNode (#6862)

Resolves: #6858

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/7042/head
XuanYang-cn 2021-08-11 14:24:09 +08:00 committed by GitHub
parent 76b92c3b69
commit 9d81a90402
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 36 additions and 16 deletions

View File

@ -19,11 +19,8 @@ func (c *Cache) checkIfCached(key UniqueID) bool {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
if _, ok := c.cacheMap[key]; !ok {
return false
}
return true
_, ok := c.cacheMap[key]
return ok
}
func (c *Cache) Cache(segID UniqueID) {
@ -32,3 +29,12 @@ func (c *Cache) Cache(segID UniqueID) {
c.cacheMap[segID] = true
}
func (c *Cache) Remove(segIDs ...UniqueID) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
for _, id := range segIDs {
delete(c.cacheMap, id)
}
}

View File

@ -13,4 +13,7 @@ func TestSegmentCache(t *testing.T) {
segCache.Cache(UniqueID(0))
assert.True(t, segCache.checkIfCached(0))
segCache.Remove(UniqueID(0))
assert.False(t, segCache.checkIfCached(0))
}

View File

@ -308,7 +308,7 @@ func (node *DataNode) getChannelNamebySegmentID(segID UniqueID) string {
node.chanMut.RLock()
defer node.chanMut.RUnlock()
for name, dataSync := range node.vchan2SyncService {
if dataSync.replica.hasSegment(segID) {
if dataSync.replica.hasSegment(segID, false) {
return name
}
}
@ -377,7 +377,10 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
dmlFlushedCh := make(chan []*datapb.ID2PathList, len(req.SegmentIDs))
for _, id := range req.SegmentIDs {
chanName := node.getChannelNamebySegmentID(id)
log.Info("vchannel", zap.String("name", chanName))
log.Debug("vchannel",
zap.String("name", chanName),
zap.Int64("SegmentID", id))
if len(chanName) == 0 {
status.Reason = fmt.Sprintf("DataNode not find segment %d!", id)
return status, nil
@ -426,7 +429,10 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
status.Reason = fmt.Sprintf("flush failed segment list = %s", failedSegments)
return status, nil
}
log.Debug("FlushSegments Done")
node.segmentCache.Remove(req.SegmentIDs...)
log.Debug("FlushSegments Done",
zap.Int64s("segments", req.SegmentIDs))
status.ErrorCode = commonpb.ErrorCode_Success
metrics.DataNodeFlushSegmentsCounter.WithLabelValues(MetricRequestsSuccess).Inc()

View File

@ -180,7 +180,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
collID := msg.GetCollectionID()
partitionID := msg.GetPartitionID()
if !ibNode.replica.hasSegment(currentSegID) {
if !ibNode.replica.hasSegment(currentSegID, true) {
err := ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(),
iMsg.startPositions[0], iMsg.endPositions[0])
if err != nil {
@ -791,7 +791,7 @@ func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error {
}
func (ibNode *insertBufferNode) getCollMetabySegID(segmentID UniqueID, ts Timestamp) (meta *etcdpb.CollectionMeta, err error) {
if !ibNode.replica.hasSegment(segmentID) {
if !ibNode.replica.hasSegment(segmentID, true) {
return nil, fmt.Errorf("No such segment %d in the replica", segmentID)
}

View File

@ -37,7 +37,7 @@ type Replica interface {
listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint
updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition)
updateSegmentCheckPoint(segID UniqueID)
hasSegment(segID UniqueID) bool
hasSegment(segID UniqueID, countFlushed bool) bool
updateStatistics(segID UniqueID, numRows int64) error
getSegmentStatisticsUpdates(segID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
@ -283,13 +283,17 @@ func (replica *SegmentReplica) removeSegment(segID UniqueID) error {
}
// hasSegment checks whether this replica has a segment according to segment ID.
func (replica *SegmentReplica) hasSegment(segID UniqueID) bool {
func (replica *SegmentReplica) hasSegment(segID UniqueID, countFlushed bool) bool {
replica.segMu.RLock()
defer replica.segMu.RUnlock()
_, inNew := replica.newSegments[segID]
_, inNormal := replica.normalSegments[segID]
_, inFlush := replica.flushedSegments[segID]
inFlush := false
if countFlushed {
_, inFlush = replica.flushedSegments[segID]
}
return inNew || inNormal || inFlush
}

View File

@ -109,13 +109,14 @@ func TestSegmentReplica(t *testing.T) {
t.Run("Test inner function segment", func(t *testing.T) {
replica := newSegmentReplica(rc, collID)
assert.False(t, replica.hasSegment(0))
assert.False(t, replica.hasSegment(0, true))
assert.False(t, replica.hasSegment(0, false))
startPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(100)}
endPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(200)}
err := replica.addNewSegment(0, 1, 2, "insert-01", startPos, endPos)
assert.NoError(t, err)
assert.True(t, replica.hasSegment(0))
assert.True(t, replica.hasSegment(0, true))
assert.Equal(t, 1, len(replica.newSegments))
seg, ok := replica.newSegments[UniqueID(0)]
@ -141,7 +142,7 @@ func TestSegmentReplica(t *testing.T) {
cp := &segmentCheckPoint{int64(10), *cpPos}
err = replica.addNormalSegment(1, 1, 2, "insert-01", int64(10), cp)
assert.NoError(t, err)
assert.True(t, replica.hasSegment(1))
assert.True(t, replica.hasSegment(1, true))
assert.Equal(t, 1, len(replica.normalSegments))
seg, ok = replica.normalSegments[UniqueID(1)]
assert.True(t, ok)