mirror of https://github.com/milvus-io/milvus.git
Remove not inuse stale segments in flushmsg (#20981)
Signed-off-by: yangxuan <xuan.yang@zilliz.com> Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/20851/head
parent
c49f20ea94
commit
f844aa0cab
|
@ -579,10 +579,9 @@ func (node *DataNode) ReadyToFlush() error {
|
|||
|
||||
// FlushSegments packs flush messages into flowGraph through flushChan.
|
||||
//
|
||||
// DataCoord calls FlushSegments if the segment is seal&flush only.
|
||||
// If DataNode receives a valid segment to flush, new flush message for the segment should be ignored.
|
||||
// So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed.
|
||||
//
|
||||
// One precondition: The segmentID in req is in ascending order.
|
||||
func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
|
||||
metrics.DataNodeFlushReqCounter.WithLabelValues(
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
|
@ -610,64 +609,50 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
|
|||
}
|
||||
|
||||
log.Info("receiving FlushSegments request",
|
||||
zap.Int64("collection ID", req.GetCollectionID()),
|
||||
zap.Int64s("segments", req.GetSegmentIDs()),
|
||||
zap.Int64("collectionID", req.GetCollectionID()),
|
||||
zap.Int64s("sealedSegments", req.GetSegmentIDs()),
|
||||
)
|
||||
|
||||
// TODO: Here and in other places, replace `flushed` param with a more meaningful name.
|
||||
processSegments := func(segmentIDs []UniqueID, flushed bool) ([]UniqueID, bool) {
|
||||
noErr := true
|
||||
var flushedSeg []UniqueID
|
||||
for _, segID := range segmentIDs {
|
||||
// if the segment in already being flushed, skip it.
|
||||
if node.segmentCache.checkIfCached(segID) {
|
||||
logDupFlush(req.GetCollectionID(), segID)
|
||||
continue
|
||||
}
|
||||
// Get the flush channel for the given segment ID.
|
||||
// If no flush channel is found, report an error.
|
||||
flushCh, err := node.flowgraphManager.getFlushCh(segID)
|
||||
if err != nil {
|
||||
errStatus.Reason = "no flush channel found for the segment, unable to flush"
|
||||
log.Error(errStatus.Reason, zap.Int64("segment ID", segID), zap.Error(err))
|
||||
noErr = false
|
||||
continue
|
||||
}
|
||||
|
||||
// Double check that the segment is still not cached.
|
||||
// Skip this flush if segment ID is cached, otherwise cache the segment ID and proceed.
|
||||
exist := node.segmentCache.checkOrCache(segID)
|
||||
if exist {
|
||||
logDupFlush(req.GetCollectionID(), segID)
|
||||
continue
|
||||
}
|
||||
// flushedSeg is only for logging purpose.
|
||||
flushedSeg = append(flushedSeg, segID)
|
||||
// Send the segment to its flush channel.
|
||||
flushCh <- flushMsg{
|
||||
msgID: req.GetBase().GetMsgID(),
|
||||
timestamp: req.GetBase().GetTimestamp(),
|
||||
segmentID: segID,
|
||||
collectionID: req.GetCollectionID(),
|
||||
flushed: flushed,
|
||||
}
|
||||
segmentIDs := req.GetSegmentIDs()
|
||||
var flushedSeg []UniqueID
|
||||
for _, segID := range segmentIDs {
|
||||
// if the segment in already being flushed, skip it.
|
||||
if node.segmentCache.checkIfCached(segID) {
|
||||
logDupFlush(req.GetCollectionID(), segID)
|
||||
continue
|
||||
}
|
||||
// Get the flush channel for the given segment ID.
|
||||
// If no flush channel is found, report an error.
|
||||
flushCh, err := node.flowgraphManager.getFlushCh(segID)
|
||||
if err != nil {
|
||||
errStatus.Reason = "no flush channel found for the segment, unable to flush"
|
||||
log.Error(errStatus.Reason, zap.Int64("segmentID", segID), zap.Error(err))
|
||||
return errStatus, nil
|
||||
}
|
||||
|
||||
// Double check that the segment is still not cached.
|
||||
// Skip this flush if segment ID is cached, otherwise cache the segment ID and proceed.
|
||||
exist := node.segmentCache.checkOrCache(segID)
|
||||
if exist {
|
||||
logDupFlush(req.GetCollectionID(), segID)
|
||||
continue
|
||||
}
|
||||
// flushedSeg is only for logging purpose.
|
||||
flushedSeg = append(flushedSeg, segID)
|
||||
// Send the segment to its flush channel.
|
||||
flushCh <- flushMsg{
|
||||
msgID: req.GetBase().GetMsgID(),
|
||||
timestamp: req.GetBase().GetTimestamp(),
|
||||
segmentID: segID,
|
||||
collectionID: req.GetCollectionID(),
|
||||
}
|
||||
log.Info("flow graph flushSegment tasks triggered",
|
||||
zap.Bool("flushed", flushed),
|
||||
zap.Int64("collection ID", req.GetCollectionID()),
|
||||
zap.Int64s("segments sending to flush channel", flushedSeg))
|
||||
return flushedSeg, noErr
|
||||
}
|
||||
|
||||
seg, noErr1 := processSegments(req.GetSegmentIDs(), true)
|
||||
// Log success flushed segments.
|
||||
if len(seg) > 0 {
|
||||
if len(flushedSeg) > 0 {
|
||||
log.Info("sending segments to flush channel",
|
||||
zap.Any("newly sealed segment IDs", seg))
|
||||
}
|
||||
// Fail FlushSegments call if at least one segment fails to get flushed.
|
||||
if !noErr1 {
|
||||
return errStatus, nil
|
||||
zap.Int64("collectionID", req.GetCollectionID()),
|
||||
zap.Int64s("sealedSegments", flushedSeg))
|
||||
}
|
||||
|
||||
metrics.DataNodeFlushReqCounter.WithLabelValues(
|
||||
|
|
|
@ -225,7 +225,7 @@ func (ibNode *insertBufferNode) GetBuffer(segID UniqueID) *BufferData {
|
|||
}
|
||||
|
||||
// CollectSegmentsToSync collects segments from flushChan from DataCoord
|
||||
func (ibNode *insertBufferNode) CollectSegmentsToSync() (flushedSegments, staleSegments []UniqueID) {
|
||||
func (ibNode *insertBufferNode) CollectSegmentsToSync() (flushedSegments []UniqueID) {
|
||||
var (
|
||||
maxBatch = 10
|
||||
targetBatch int
|
||||
|
@ -240,23 +240,18 @@ func (ibNode *insertBufferNode) CollectSegmentsToSync() (flushedSegments, staleS
|
|||
|
||||
for i := 1; i <= targetBatch; i++ {
|
||||
fmsg := <-ibNode.flushChan
|
||||
if fmsg.flushed {
|
||||
flushedSegments = append(flushedSegments, fmsg.segmentID)
|
||||
} else {
|
||||
staleSegments = append(staleSegments, fmsg.segmentID)
|
||||
}
|
||||
flushedSegments = append(flushedSegments, fmsg.segmentID)
|
||||
}
|
||||
|
||||
if targetBatch > 0 {
|
||||
log.Info("(Manual Sync) batch processing flush messages",
|
||||
zap.Int("batchSize", targetBatch),
|
||||
zap.Int64s("flushedSegments", flushedSegments),
|
||||
zap.Int64s("staleSegments", staleSegments),
|
||||
zap.String("channel", ibNode.channelName),
|
||||
)
|
||||
}
|
||||
|
||||
return flushedSegments, staleSegments
|
||||
return flushedSegments
|
||||
}
|
||||
|
||||
// DisplayStatistics logs the statistic changes of segment in mem
|
||||
|
@ -371,8 +366,7 @@ func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload
|
|||
}
|
||||
}
|
||||
|
||||
flushedSegments, staleSegments := ibNode.CollectSegmentsToSync()
|
||||
mergeSyncTask(staleSegments, syncTasks, func(task *syncTask) {})
|
||||
flushedSegments := ibNode.CollectSegmentsToSync()
|
||||
mergeSyncTask(flushedSegments, syncTasks, func(task *syncTask) {
|
||||
task.flushed = true
|
||||
})
|
||||
|
|
|
@ -515,10 +515,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
|
|||
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 434}}
|
||||
|
||||
// trigger manual flush
|
||||
flushChan <- flushMsg{
|
||||
segmentID: 10,
|
||||
flushed: true,
|
||||
}
|
||||
flushChan <- flushMsg{segmentID: 10}
|
||||
|
||||
// trigger auto flush since buffer full
|
||||
output := iBNode.Operate([]flowgraph.Msg{iMsg})
|
||||
|
@ -812,7 +809,6 @@ func (s *InsertBufferNodeSuit) TestFillInSyncTasks() {
|
|||
msg := flushMsg{
|
||||
segmentID: UniqueID(i),
|
||||
collectionID: s.collID,
|
||||
flushed: i%2 == 0, // segID=2, flushed = true
|
||||
}
|
||||
|
||||
flushCh <- msg
|
||||
|
@ -821,13 +817,8 @@ func (s *InsertBufferNodeSuit) TestFillInSyncTasks() {
|
|||
syncTasks := node.FillInSyncTasks(&flowGraphMsg{endPositions: []*internalpb.MsgPosition{{Timestamp: 100}}}, nil)
|
||||
s.Assert().NotEmpty(syncTasks)
|
||||
|
||||
for segID, task := range syncTasks {
|
||||
if segID == UniqueID(2) {
|
||||
s.Assert().True(task.flushed)
|
||||
} else {
|
||||
s.Assert().False(task.flushed)
|
||||
}
|
||||
|
||||
for _, task := range syncTasks {
|
||||
s.Assert().True(task.flushed)
|
||||
s.Assert().False(task.auto)
|
||||
s.Assert().False(task.dropped)
|
||||
}
|
||||
|
@ -845,11 +836,6 @@ func (s *InsertBufferNodeSuit) TestFillInSyncTasks() {
|
|||
msg := flushMsg{
|
||||
segmentID: UniqueID(i),
|
||||
collectionID: s.collID,
|
||||
flushed: false,
|
||||
}
|
||||
|
||||
if i == 2 {
|
||||
msg.flushed = true
|
||||
}
|
||||
|
||||
flushCh <- msg
|
||||
|
@ -859,13 +845,8 @@ func (s *InsertBufferNodeSuit) TestFillInSyncTasks() {
|
|||
s.Assert().NotEmpty(syncTasks)
|
||||
s.Assert().Equal(10, len(syncTasks)) // 10 is max batch
|
||||
|
||||
for segID, task := range syncTasks {
|
||||
if segID == UniqueID(2) {
|
||||
s.Assert().True(task.flushed)
|
||||
} else {
|
||||
s.Assert().False(task.flushed)
|
||||
}
|
||||
|
||||
for _, task := range syncTasks {
|
||||
s.Assert().True(task.flushed)
|
||||
s.Assert().False(task.auto)
|
||||
s.Assert().False(task.dropped)
|
||||
}
|
||||
|
@ -1074,14 +1055,11 @@ func TestInsertBufferNode_collectSegmentsToSync(t *testing.T) {
|
|||
}
|
||||
|
||||
for i := 0; i < test.inFlushMsgNum; i++ {
|
||||
flushCh <- flushMsg{
|
||||
segmentID: UniqueID(i),
|
||||
flushed: i%2 == 0,
|
||||
}
|
||||
flushCh <- flushMsg{segmentID: UniqueID(i)}
|
||||
}
|
||||
|
||||
flushedSegs, staleSegs := node.CollectSegmentsToSync()
|
||||
assert.Equal(t, test.expectedOutNum, len(flushedSegs)+len(staleSegs))
|
||||
flushedSegs := node.CollectSegmentsToSync()
|
||||
assert.Equal(t, test.expectedOutNum, len(flushedSegs))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,7 +62,6 @@ type flushMsg struct {
|
|||
timestamp Timestamp
|
||||
segmentID UniqueID
|
||||
collectionID UniqueID
|
||||
flushed bool
|
||||
}
|
||||
|
||||
type resendTTMsg struct {
|
||||
|
|
Loading…
Reference in New Issue