mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: MrPresent-Han <chun.han@zilliz.com>pull/26132/head
parent
241117dd6d
commit
fb933fe64d
|
@ -83,7 +83,7 @@ type Channel interface {
|
|||
setSegmentLastSyncTs(segID UniqueID, ts Timestamp)
|
||||
|
||||
updateSegmentRowNumber(segID UniqueID, numRows int64)
|
||||
updateSegmentMemorySize(segID UniqueID, memorySize int64)
|
||||
updateSingleSegmentMemorySize(segID UniqueID)
|
||||
getSegmentStatisticsUpdates(segID UniqueID) (*commonpb.SegmentStats, error)
|
||||
segmentFlushed(segID UniqueID)
|
||||
|
||||
|
@ -274,7 +274,7 @@ func (c *ChannelMeta) listSegmentIDsToSync(ts Timestamp) []UniqueID {
|
|||
|
||||
validSegs := make([]*Segment, 0)
|
||||
for _, seg := range c.segments {
|
||||
if !seg.isValid() {
|
||||
if seg.getType() == datapb.SegmentType_Flushed || seg.getType() == datapb.SegmentType_Compacted {
|
||||
continue
|
||||
}
|
||||
validSegs = append(validSegs, seg)
|
||||
|
@ -295,6 +295,10 @@ func (c *ChannelMeta) setSegmentLastSyncTs(segID UniqueID, ts Timestamp) {
|
|||
defer c.segMu.Unlock()
|
||||
if _, ok := c.segments[segID]; ok {
|
||||
c.segments[segID].lastSyncTs = ts
|
||||
tsTime, _ := tsoutil.ParseTS(ts)
|
||||
log.Debug("Set last syncTs for segment", zap.Int64("segmentID", segID), zap.Time("ts", tsTime))
|
||||
} else {
|
||||
log.Warn("Wrong! Try to set lastSync ts for non-existing segment", zap.Int64("segmentID", segID))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -601,14 +605,25 @@ func (c *ChannelMeta) updateSegmentRowNumber(segID UniqueID, numRows int64) {
|
|||
log.Warn("update segment num row not exist", zap.Int64("segmentID", segID))
|
||||
}
|
||||
|
||||
// updateStatistics updates the number of rows of a segment in channel.
|
||||
func (c *ChannelMeta) updateSegmentMemorySize(segID UniqueID, memorySize int64) {
|
||||
func (c *ChannelMeta) calculateSegmentMemorySize(segID UniqueID) int64 {
|
||||
var memorySize int64
|
||||
if ibf, ok := c.getCurInsertBuffer(segID); ok {
|
||||
memorySize += ibf.memorySize()
|
||||
}
|
||||
if dbf, ok := c.getCurDeleteBuffer(segID); ok {
|
||||
memorySize += dbf.GetMemorySize()
|
||||
}
|
||||
return memorySize
|
||||
}
|
||||
|
||||
func (c *ChannelMeta) updateSingleSegmentMemorySize(segID UniqueID) {
|
||||
memorySize := c.calculateSegmentMemorySize(segID)
|
||||
c.segMu.Lock()
|
||||
defer c.segMu.Unlock()
|
||||
|
||||
log.Info("updating segment memorySize", zap.Int64("segmentID", segID), zap.Int64("memorySize", memorySize))
|
||||
log.Info("updating segment memorySize", zap.Int64("segmentID", segID),
|
||||
zap.Int64("memorySize", memorySize))
|
||||
seg, ok := c.segments[segID]
|
||||
if ok && seg.notFlushed() {
|
||||
if ok {
|
||||
seg.memorySize = memorySize
|
||||
return
|
||||
}
|
||||
|
|
|
@ -152,7 +152,7 @@ func TestChannelMeta_InnerFunction(t *testing.T) {
|
|||
|
||||
channel.updateSegmentRowNumber(0, 10)
|
||||
assert.Equal(t, int64(10), seg.numRows)
|
||||
channel.updateSegmentMemorySize(0, 10)
|
||||
channel.getSegment(0).memorySize = 10
|
||||
assert.Equal(t, int64(10), seg.memorySize)
|
||||
|
||||
segPos := channel.listNewSegmentsStartPositions()
|
||||
|
|
|
@ -298,14 +298,7 @@ func (ibNode *insertBufferNode) DisplayStatistics(seg2Upload []UniqueID) {
|
|||
// updateSegmentsMemorySize updates segments' memory size in channel meta
|
||||
func (ibNode *insertBufferNode) updateSegmentsMemorySize(seg2Upload []UniqueID) {
|
||||
for _, segID := range seg2Upload {
|
||||
var memorySize int64
|
||||
if buffer, ok := ibNode.channel.getCurInsertBuffer(segID); ok {
|
||||
memorySize += buffer.memorySize()
|
||||
}
|
||||
if buffer, ok := ibNode.channel.getCurDeleteBuffer(segID); ok {
|
||||
memorySize += buffer.GetLogSize()
|
||||
}
|
||||
ibNode.channel.updateSegmentMemorySize(segID, memorySize)
|
||||
ibNode.channel.updateSingleSegmentMemorySize(segID)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -482,7 +475,7 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
|
|||
continue
|
||||
}
|
||||
segment.setSyncing(true)
|
||||
log.Info("insertBufferNode syncing BufferData")
|
||||
log.Info("insertBufferNode start syncing bufferData")
|
||||
// use the flushed pk stats to take current stat
|
||||
var pkStats *storage.PrimaryKeyStats
|
||||
// TODO, this has to be async flush, no need to block here.
|
||||
|
@ -519,6 +512,7 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
|
|||
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.TotalLabel).Inc()
|
||||
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel).Inc()
|
||||
}
|
||||
log.Info("insertBufferNode finish submitting syncing bufferData")
|
||||
}
|
||||
return segmentsToSync
|
||||
}
|
||||
|
|
|
@ -222,7 +222,7 @@ func TestFlowGraphManager(t *testing.T) {
|
|||
assert.True(t, ok)
|
||||
err = fg.channel.addSegment(addSegmentReq{segID: 0})
|
||||
assert.NoError(t, err)
|
||||
fg.channel.updateSegmentMemorySize(0, memorySize)
|
||||
fg.channel.getSegment(0).memorySize = memorySize
|
||||
fg.channel.(*ChannelMeta).needToSync.Store(false)
|
||||
}
|
||||
fm.execute(test.totalMemory)
|
||||
|
|
|
@ -957,6 +957,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
|
|||
dsService.channel.evictHistoryInsertBuffer(req.GetSegmentID(), pack.pos)
|
||||
dsService.channel.evictHistoryDeleteBuffer(req.GetSegmentID(), pack.pos)
|
||||
segment := dsService.channel.getSegment(req.GetSegmentID())
|
||||
dsService.channel.updateSingleSegmentMemorySize(req.GetSegmentID())
|
||||
segment.setSyncing(false)
|
||||
// dsService.channel.saveBinlogPath(fieldStats)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue