refine log out for datanode(#25763) (#26195)

Signed-off-by: MrPresent-Han <chun.han@zilliz.com>
pull/26217/head
MrPresent-Han 2023-08-08 16:49:08 +08:00 committed by GitHub
parent 09cf8700a7
commit b84f5c560a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 15 additions and 9 deletions

View File

@ -595,10 +595,11 @@ func (c *ChannelMeta) updateSegmentRowNumber(segID UniqueID, numRows int64) {
c.segMu.Lock() c.segMu.Lock()
defer c.segMu.Unlock() defer c.segMu.Unlock()
log.Info("updating segment num row", zap.Int64("segmentID", segID), zap.Int64("numRows", numRows))
seg, ok := c.segments[segID] seg, ok := c.segments[segID]
if ok && seg.notFlushed() { if ok && seg.notFlushed() {
seg.numRows += numRows seg.numRows += numRows
log.Info("updated segment num row", zap.Int64("segmentID", segID),
zap.Int64("addedNumRows", numRows), zap.Int64("numRowsSum", seg.numRows))
return return
} }
@ -620,11 +621,11 @@ func (c *ChannelMeta) updateSingleSegmentMemorySize(segID UniqueID) {
memorySize := c.calculateSegmentMemorySize(segID) memorySize := c.calculateSegmentMemorySize(segID)
c.segMu.Lock() c.segMu.Lock()
defer c.segMu.Unlock() defer c.segMu.Unlock()
log.Info("updating segment memorySize", zap.Int64("segmentID", segID),
zap.Int64("memorySize", memorySize))
seg, ok := c.segments[segID] seg, ok := c.segments[segID]
if ok { if ok {
seg.memorySize = memorySize seg.memorySize = memorySize
log.Info("updated segment memorySize", zap.Int64("segmentID", segID),
zap.Int64("memorySize", seg.memorySize))
return return
} }

View File

@ -463,17 +463,17 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
zap.Any("position", endPosition), zap.Any("position", endPosition),
zap.String("channel", ibNode.channelName), zap.String("channel", ibNode.channelName),
) )
// check if task pool is full
if !task.dropped && !task.flushed && ibNode.flushManager.isFull() {
log.RatedWarn(10, "task pool is full, skip it")
continue
}
// check if segment is syncing // check if segment is syncing
segment := ibNode.channel.getSegment(task.segmentID) segment := ibNode.channel.getSegment(task.segmentID)
if !task.dropped && !task.flushed && segment.isSyncing() { if !task.dropped && !task.flushed && segment.isSyncing() {
log.RatedInfo(10, "segment is syncing, skip it") log.RatedInfo(10, "segment is syncing, skip it")
continue continue
} }
// check if task pool is full
if !task.dropped && !task.flushed && ibNode.flushManager.isFull() {
log.RatedWarn(10, "task pool is full, skip it")
continue
}
segment.setSyncing(true) segment.setSyncing(true)
log.Info("insertBufferNode start syncing bufferData") log.Info("insertBufferNode start syncing bufferData")
// use the flushed pk stats to take current stat // use the flushed pk stats to take current stat

View File

@ -91,7 +91,12 @@ func (fm *flowgraphManager) execute(totalMemory uint64) {
return return
} }
if float64(total) < float64(totalMemory)*Params.DataNodeCfg.MemoryWatermark.GetAsFloat() { memoryWatermark := float64(totalMemory) * Params.DataNodeCfg.MemoryWatermark.GetAsFloat()
if float64(total) < memoryWatermark {
log.RatedDebug(5, "skip force sync because memory level is not high enough",
zap.Float64("current_total_memory_usage", float64(total)),
zap.Float64("current_memory_watermark", memoryWatermark),
zap.Any("channel_memory_usages", channels))
return return
} }