Add Flush segment optimization

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2021-03-24 14:43:38 +08:00 committed by yefu.chen
parent 8464e8d447
commit 972dad4d8e
1 changed files with 16 additions and 11 deletions

View File

@ -85,6 +85,7 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic
if err != nil {
return err
}
coll2Segs := make(map[UniqueID][]UniqueID)
for _, id := range segments {
expired, err := watcher.allocator.IsAllocationsExpired(ctx, id, msg.Base.Timestamp)
if err != nil {
@ -92,7 +93,7 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic
continue
}
if expired {
segmentInfo, err := watcher.meta.GetSegment(id)
sInfo, err := watcher.meta.GetSegment(id)
if err != nil {
log.Error("get segment from meta error", zap.Int64("segmentID", id), zap.Error(err))
continue
@ -101,18 +102,22 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic
log.Error("set segment state error", zap.Int64("segmentID", id), zap.Error(err))
continue
}
watcher.cluster.FlushSegment(&datapb.FlushSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Flush,
MsgID: -1, // todo add msg id
Timestamp: 0, // todo
SourceID: Params.NodeID,
},
CollectionID: segmentInfo.CollectionID,
SegmentIDs: []int64{segmentInfo.SegmentID},
})
collID, segID := sInfo.CollectionID, sInfo.SegmentID
coll2Segs[collID] = append(coll2Segs[collID], segID)
watcher.allocator.DropSegment(ctx, id)
}
}
for collID, segIDs := range coll2Segs {
watcher.cluster.FlushSegment(&datapb.FlushSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Flush,
MsgID: -1, // todo add msg id
Timestamp: 0, // todo
SourceID: Params.NodeID,
},
CollectionID: collID,
SegmentIDs: segIDs,
})
}
return nil
}