Make DN able to process syncing request in batch (#20126)

See also: #19834

Signed-off-by: yangxuan <xuan.yang@zilliz.com>

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/20143/head
XuanYang-cn 2022-10-27 21:25:33 +08:00 committed by GitHub
parent 404fc68afa
commit d5bc4e2585
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 373 additions and 322 deletions

View File

@ -189,14 +189,14 @@ func (c *ChannelMeta) addSegment(req addSegmentReq) error {
return fmt.Errorf("mismatch collection, ID=%d", req.collID)
}
log.Info("adding segment",
zap.String("segment type", req.segType.String()),
zap.Int64("segment ID", req.segID),
zap.Int64("collection ID", req.collID),
zap.Int64("partition ID", req.partitionID),
zap.String("channel name", c.channelName),
zap.Any("start position", req.startPos),
zap.Any("end position", req.endPos),
zap.Uint64("recover ts", req.recoverTs),
zap.String("type", req.segType.String()),
zap.Int64("segmentID", req.segID),
zap.Int64("collectionID", req.collID),
zap.Int64("partitionID", req.partitionID),
zap.String("channel", c.channelName),
zap.Any("startPosition", req.startPos),
zap.Any("endPosition", req.endPos),
zap.Uint64("recoverTs", req.recoverTs),
zap.Bool("importing", req.importing),
)
seg := &Segment{

View File

@ -96,7 +96,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
}
// update compacted segment before operation
if len(fgMsg.deleteMessages) > 0 || len(fgMsg.segmentsToFlush) > 0 {
if len(fgMsg.deleteMessages) > 0 || len(fgMsg.segmentsToSync) > 0 {
dn.updateCompactedSegments()
}
@ -122,11 +122,11 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
}
// process flush messages
if len(fgMsg.segmentsToFlush) > 0 {
if len(fgMsg.segmentsToSync) > 0 {
log.Info("DeleteNode receives flush message",
zap.Int64s("segIDs", fgMsg.segmentsToFlush),
zap.Int64s("segIDs", fgMsg.segmentsToSync),
zap.String("vChannelName", dn.channelName))
for _, segmentToFlush := range fgMsg.segmentsToFlush {
for _, segmentToFlush := range fgMsg.segmentsToSync {
buf, ok := dn.delBuf.Load(segmentToFlush)
if !ok {
// no related delta data to flush, send empty buf to complete flush life-cycle

View File

@ -235,7 +235,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
assert.Nil(te, err)
msg := genFlowGraphDeleteMsg(int64Pks, chanName)
msg.segmentsToFlush = segIDs
msg.segmentsToSync = segIDs
// this will fail since ts = 0 will trigger mocked error
var fgMsg flowgraph.Msg = &msg
delNode.Operate([]flowgraph.Msg{fgMsg})
@ -258,7 +258,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
assert.Nil(te, err)
msg := genFlowGraphDeleteMsg(int64Pks, chanName)
msg.segmentsToFlush = segIDs
msg.segmentsToSync = segIDs
msg.endPositions[0].Timestamp = 100 // set to normal timestamp
var fgMsg flowgraph.Msg = &msg
@ -288,7 +288,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
assert.Nil(t, err)
msg := genFlowGraphDeleteMsg(int64Pks, chanName)
msg.segmentsToFlush = segIDs
msg.segmentsToSync = segIDs
msg.endPositions[0].Timestamp = 100 // set to normal timestamp
msg.dropCollection = true
@ -322,7 +322,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
assert.Nil(te, err)
msg := genFlowGraphDeleteMsg(int64Pks, chanName)
msg.segmentsToFlush = []UniqueID{-1}
msg.segmentsToSync = []UniqueID{-1}
delNode.delBuf.Store(UniqueID(-1), &DelDataBuf{})
delNode.flushManager = &mockFlushManager{
returnError: true,
@ -367,7 +367,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
msg := genFlowGraphDeleteMsg(int64Pks, chanName)
msg.deleteMessages = []*msgstream.DeleteMsg{}
msg.segmentsToFlush = []UniqueID{compactedSegment}
msg.segmentsToSync = []UniqueID{compactedSegment}
delNode.delBuf.Store(compactedSegment, &DelDataBuf{delData: &DeleteData{}})
delNode.flushManager = NewRendezvousFlushManager(&allocator{}, cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)

View File

@ -171,17 +171,17 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
ibNode.DisplayStatistics(seg2Upload)
segmentsToFlush := ibNode.Flush(fgMsg, seg2Upload, endPositions[0])
segmentsToSync := ibNode.Sync(fgMsg, seg2Upload, endPositions[0])
ibNode.WriteTimeTick(fgMsg.timeRange.timestampMax, seg2Upload)
res := flowGraphMsg{
deleteMessages: fgMsg.deleteMessages,
timeRange: fgMsg.timeRange,
startPositions: fgMsg.startPositions,
endPositions: fgMsg.endPositions,
segmentsToFlush: segmentsToFlush,
dropCollection: fgMsg.dropCollection,
deleteMessages: fgMsg.deleteMessages,
timeRange: fgMsg.timeRange,
startPositions: fgMsg.startPositions,
endPositions: fgMsg.endPositions,
segmentsToSync: segmentsToSync,
dropCollection: fgMsg.dropCollection,
}
for _, sp := range spans {
@ -211,6 +211,58 @@ func (ibNode *insertBufferNode) verifyInMsg(in []Msg) (*flowGraphMsg, bool) {
return fgMsg, ok
}
func (ibNode *insertBufferNode) GetBufferIfFull(segID UniqueID) (*BufferData, bool) {
if bd, ok := ibNode.insertBuffer.Load(segID); ok && bd.(*BufferData).effectiveCap() <= 0 {
return bd.(*BufferData), true
}
return nil, false
}
// GetBuffer returns buffer data for a segment, returns nil if segment's not in buffer
func (ibNode *insertBufferNode) GetBuffer(segID UniqueID) *BufferData {
var buf *BufferData
if bd, ok := ibNode.insertBuffer.Load(segID); ok {
buf = bd.(*BufferData)
}
return buf
}
// CollectSegmentsToSync collects segments from flushChan from DataCoord
func (ibNode *insertBufferNode) CollectSegmentsToSync() (flushedSegments, staleSegments []UniqueID) {
var (
maxBatch = 10
targetBatch int
)
size := len(ibNode.flushChan)
if size >= maxBatch {
targetBatch = maxBatch
} else {
targetBatch = size
}
for i := 1; i <= targetBatch; i++ {
fmsg := <-ibNode.flushChan
if fmsg.flushed {
flushedSegments = append(flushedSegments, fmsg.segmentID)
} else {
staleSegments = append(staleSegments, fmsg.segmentID)
}
}
if targetBatch > 0 {
log.Info("(Manual Sync) batch processing flush messages",
zap.Int("batchSize", targetBatch),
zap.Int64s("flushedSegments", flushedSegments),
zap.Int64s("staleSegments", staleSegments),
zap.String("channel", ibNode.channelName),
)
}
return flushedSegments, staleSegments
}
// DisplayStatistics logs the statistic changes of segment in mem
func (ibNode *insertBufferNode) DisplayStatistics(seg2Upload []UniqueID) {
// Find and return the smaller input
@ -221,24 +273,22 @@ func (ibNode *insertBufferNode) DisplayStatistics(seg2Upload []UniqueID) {
return latter
}
// limit the logging size
displaySize := min(10, len(seg2Upload))
// Log the segment statistics in mem
for k, segID := range seg2Upload[:displaySize] {
bd, ok := ibNode.insertBuffer.Load(segID)
if !ok {
continue
if bd, ok := ibNode.insertBuffer.Load(segID); ok {
log.Info("segment buffer status",
zap.Int("no.", k),
zap.Int64("segmentID", segID),
zap.String("channel", ibNode.channelName),
zap.Int64("size", bd.(*BufferData).size),
zap.Int64("limit", bd.(*BufferData).limit))
}
log.Info("insert seg buffer status", zap.Int("No.", k),
zap.Int64("segmentID", segID),
zap.String("vchannel name", ibNode.channelName),
zap.Int64("buffer size", bd.(*BufferData).size),
zap.Int64("buffer limit", bd.(*BufferData).limit))
}
}
type flushTask struct {
type syncTask struct {
buffer *BufferData
segmentID UniqueID
flushed bool
@ -246,127 +296,107 @@ type flushTask struct {
auto bool
}
func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload []UniqueID) []flushTask {
var flushTasks []flushTask
func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload []UniqueID) map[UniqueID]*syncTask {
var syncTasks = make(map[UniqueID]*syncTask)
if fgMsg.dropCollection {
// All segments in the collection will be synced, not matter empty buffer or not
segmentIDs := ibNode.channel.listAllSegmentIDs()
log.Info("Receive drop collection request and flushing all segments",
zap.Any("segments", segmentIDs),
log.Info("Receive drop collection request and syncing all segments",
zap.Int64s("segments", segmentIDs),
zap.String("channel", ibNode.channelName),
)
flushTasks = make([]flushTask, 0, len(segmentIDs))
for _, seg2Flush := range segmentIDs {
var buf *BufferData
bd, ok := ibNode.insertBuffer.Load(seg2Flush)
if !ok {
buf = nil
} else {
buf = bd.(*BufferData)
}
flushTasks = append(flushTasks, flushTask{
buffer: buf,
segmentID: seg2Flush,
for _, segID := range segmentIDs {
buf := ibNode.GetBuffer(segID)
syncTasks[segID] = &syncTask{
buffer: buf, // nil is valid
segmentID: segID,
flushed: false,
dropped: true,
})
}
}
return flushTasks
return syncTasks
}
// Auto Syncing
for _, segToFlush := range seg2Upload {
if bd, ok := ibNode.insertBuffer.Load(segToFlush); ok && bd.(*BufferData).effectiveCap() <= 0 {
log.Info("(Auto Flush)",
zap.Int64("segment id", segToFlush),
zap.String("vchannel name", ibNode.channelName),
)
ibuffer := bd.(*BufferData)
// Auto Sync
for _, segID := range seg2Upload {
if ibuffer, ok := ibNode.GetBufferIfFull(segID); ok {
log.Info("(Auto Sync)",
zap.Int64("segmentID", segID),
zap.Int64("numRows", ibuffer.size),
zap.Int64("limit", ibuffer.limit),
zap.String("channel", ibNode.channelName))
flushTasks = append(flushTasks, flushTask{
syncTasks[segID] = &syncTask{
buffer: ibuffer,
segmentID: segToFlush,
segmentID: segID,
flushed: false,
dropped: false,
auto: true,
})
}
}
}
mergeFlushTask := func(segmentID UniqueID, setupTask func(task *flushTask)) {
// Merge auto & manual flush tasks with the same segment ID.
dup := false
for i, task := range flushTasks {
if task.segmentID == segmentID {
log.Info("merging flush task, updating flushed flag",
zap.Int64("segment ID", segmentID))
setupTask(&flushTasks[i])
dup = true
break
mergeSyncTask := func(segmentIDs []UniqueID, syncTasks map[UniqueID]*syncTask, setupTask func(task *syncTask)) {
// Merge auto & manual sync tasks with the same segment ID.
for _, segmentID := range segmentIDs {
if task, ok := syncTasks[segmentID]; ok {
setupTask(task)
log.Info("merging sync task, updating flushed flag",
zap.Int64("segmentID", segmentID),
zap.Bool("flushed", task.flushed),
zap.Bool("dropped", task.dropped),
)
return
}
}
// Load buffer and create new flush task if there's no existing flush task for this segment.
if !dup {
bd, ok := ibNode.insertBuffer.Load(segmentID)
var buf *BufferData
if ok {
buf = bd.(*BufferData)
}
task := flushTask{
buffer: buf,
buf := ibNode.GetBuffer(segmentID)
task := syncTask{
buffer: buf, // nil is valid
segmentID: segmentID,
dropped: false,
}
setupTask(&task)
flushTasks = append(flushTasks, task)
syncTasks[segmentID] = &task
}
}
// Manual Syncing
select {
case fmsg := <-ibNode.flushChan:
log.Info("(Manual Flush) receiving flush message",
zap.Int64("segmentID", fmsg.segmentID),
zap.Int64("collectionID", fmsg.collectionID),
zap.Bool("flushed", fmsg.flushed),
zap.String("v-channel name", ibNode.channelName),
)
mergeFlushTask(fmsg.segmentID, func(task *flushTask) {
task.flushed = fmsg.flushed
})
default:
}
flushedSegments, staleSegments := ibNode.CollectSegmentsToSync()
mergeSyncTask(staleSegments, syncTasks, func(task *syncTask) {})
mergeSyncTask(flushedSegments, syncTasks, func(task *syncTask) {
task.flushed = true
})
// process drop partition
for _, partitionDrop := range fgMsg.dropPartitions {
segmentIDs := ibNode.channel.listPartitionSegments(partitionDrop)
log.Info("(Drop Partition) process drop partition",
log.Info("(Drop Partition) syncing all segments in the partition",
zap.Int64("collectionID", ibNode.channel.getCollectionID()),
zap.Int64("partitionID", partitionDrop),
zap.Int64s("segmentIDs", segmentIDs),
zap.String("v-channel name", ibNode.channelName),
zap.String("channel", ibNode.channelName),
)
for _, segID := range segmentIDs {
mergeFlushTask(segID, func(task *flushTask) {
task.flushed = true
task.dropped = true
})
}
mergeSyncTask(segmentIDs, syncTasks, func(task *syncTask) {
task.flushed = true
task.dropped = true
})
}
return flushTasks
return syncTasks
}
func (ibNode *insertBufferNode) Flush(fgMsg *flowGraphMsg, seg2Upload []UniqueID, endPosition *internalpb.MsgPosition) []UniqueID {
flushTasks := ibNode.FillInSyncTasks(fgMsg, seg2Upload)
segmentsToFlush := make([]UniqueID, 0, len(flushTasks))
func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID, endPosition *internalpb.MsgPosition) []UniqueID {
syncTasks := ibNode.FillInSyncTasks(fgMsg, seg2Upload)
segmentsToSync := make([]UniqueID, 0, len(syncTasks))
for _, task := range flushTasks {
log.Info("insertBufferNode flushing BufferData",
for _, task := range syncTasks {
log.Info("insertBufferNode syncing BufferData",
zap.Int64("segmentID", task.segmentID),
zap.Bool("flushed", task.flushed),
zap.Bool("dropped", task.dropped),
zap.Bool("auto", task.auto),
zap.Any("position", endPosition),
zap.String("channel", ibNode.channelName),
)
segStats, err := ibNode.channel.getSegmentStatslog(task.segmentID)
@ -394,7 +424,7 @@ func (ibNode *insertBufferNode) Flush(fgMsg *flowGraphMsg, seg2Upload []UniqueID
log.Error(err.Error())
panic(err)
}
segmentsToFlush = append(segmentsToFlush, task.segmentID)
segmentsToSync = append(segmentsToSync, task.segmentID)
ibNode.insertBuffer.Delete(task.segmentID)
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.SuccessLabel).Inc()
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc()
@ -403,7 +433,7 @@ func (ibNode *insertBufferNode) Flush(fgMsg *flowGraphMsg, seg2Upload []UniqueID
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc()
}
}
return segmentsToFlush
return segmentsToSync
}
// updateSegmentStates updates statistics in channel meta for the segments in insertMsgs.

View File

@ -40,6 +40,7 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
var insertNodeTestDir = "/tmp/milvus_test/insert_node"
@ -420,10 +421,10 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
// Triger auto flush
output := iBNode.Operate([]flowgraph.Msg{iMsg})
fgm := output[0].(*flowGraphMsg)
wg.Add(len(fgm.segmentsToFlush))
t.Log("segments to flush", fgm.segmentsToFlush)
wg.Add(len(fgm.segmentsToSync))
t.Log("segments to flush", fgm.segmentsToSync)
for _, im := range fgm.segmentsToFlush {
for _, im := range fgm.segmentsToSync {
// send del done signal
err = fm.flushDelData(nil, im, fgm.endPositions[0])
assert.NoError(t, err)
@ -522,8 +523,8 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
// trigger auto flush since buffer full
output := iBNode.Operate([]flowgraph.Msg{iMsg})
fgm := output[0].(*flowGraphMsg)
wg.Add(len(fgm.segmentsToFlush))
for _, im := range fgm.segmentsToFlush {
wg.Add(len(fgm.segmentsToSync))
for _, im := range fgm.segmentsToSync {
// send del done signal
err = fm.flushDelData(nil, im, fgm.endPositions[0])
assert.NoError(t, err)
@ -542,221 +543,208 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
})
}
func TestFlowGraphInsertBufferNode_DropPartition(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
type InsertBufferNodeSuit struct {
suite.Suite
partitionID := int64(1)
channel *ChannelMeta
testPath := "/test/datanode/root/meta"
err := clearEtcd(testPath)
require.NoError(t, err)
Params.EtcdCfg.MetaRootPath = testPath
collID UniqueID
partID UniqueID
cm *storage.LocalChunkManager
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
dataFactory := NewDataFactory()
originalConfig int64
}
mockRootCoord := &RootCoordFactory{
func (s *InsertBufferNodeSuit) SetupSuite() {
insertBufferNodeTestDir := "/tmp/milvus_test/insert_buffer_node"
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
channel := &ChannelMeta{
collectionID: collMeta.ID,
segments: make(map[UniqueID]*Segment),
s.collID = 1
s.partID = 10
s.channel = newChannel("channel", s.collID, nil, rc, s.cm)
s.cm = storage.NewLocalChunkManager(storage.RootPath(insertBufferNodeTestDir))
s.originalConfig = Params.DataNodeCfg.FlushInsertBufferSize
// change flushing size to 2
Params.DataNodeCfg.FlushInsertBufferSize = 4 * 4
}
func (s *InsertBufferNodeSuit) TearDownSuite() {
s.cm.RemoveWithPrefix(context.Background(), "")
Params.DataNodeCfg.FlushInsertBufferSize = s.originalConfig
}
func (s *InsertBufferNodeSuit) SetupTest() {
segs := []struct {
segID UniqueID
sType datapb.SegmentType
}{
{1, datapb.SegmentType_New},
{2, datapb.SegmentType_Normal},
{3, datapb.SegmentType_Flushed},
}
channel.metaService = newMetaService(mockRootCoord, collMeta.ID)
factory := dependency.NewDefaultFactory(true)
flushPacks := []*segmentFlushPack{}
fpMut := sync.Mutex{}
wg := sync.WaitGroup{}
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix(ctx, "")
fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, channel, func(pack *segmentFlushPack) {
fpMut.Lock()
flushPacks = append(flushPacks, pack)
fpMut.Unlock()
channel.listNewSegmentsStartPositions()
if pack.flushed || pack.dropped {
channel.segmentFlushed(pack.segmentID)
}
wg.Done()
}, emptyFlushAndDropFunc)
flushChan := make(chan flushMsg, 100)
resendTTChan := make(chan resendTTMsg, 100)
c := &nodeConfig{
channel: channel,
msFactory: factory,
allocator: NewAllocatorFactory(),
vChannelName: "string",
for _, seg := range segs {
err := s.channel.addSegment(addSegmentReq{
segType: seg.sType,
segID: seg.segID,
collID: s.collID,
partitionID: s.partID,
startPos: new(internalpb.MsgPosition),
endPos: new(internalpb.MsgPosition),
})
s.Require().NoError(err)
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c)
require.NoError(t, err)
}
// Auto flush number of rows set to 2
func (s *InsertBufferNodeSuit) TearDownTest() {
s.channel.removeSegments(1, 2, 3)
}
inMsg := genFlowGraphInsertMsg("datanode-03-test-autoflush")
inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(2)
var iMsg flowgraph.Msg = &inMsg
func (s *InsertBufferNodeSuit) TestFillInSyncTasks() {
s.Run("drop collection", func() {
fgMsg := &flowGraphMsg{dropCollection: true}
t.Run("Only drop partition", func(t *testing.T) {
// iBNode.insertBuffer.maxSize = 2
tmp := Params.DataNodeCfg.FlushInsertBufferSize
Params.DataNodeCfg.FlushInsertBufferSize = 4 * 4
defer func() {
Params.DataNodeCfg.FlushInsertBufferSize = tmp
}()
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = int64(i%2) + 1
inMsg.insertMessages[i].PartitionID = partitionID
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 100}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 123}}
type Test struct {
expectedSegID UniqueID
expectedNumOfRows int64
expectedStartPosTs Timestamp
expectedEndPosTs Timestamp
expectedCpNumOfRows int64
expectedCpPosTs Timestamp
node := &insertBufferNode{
channelName: s.channel.channelName,
channel: s.channel,
flushChan: make(chan flushMsg, 100),
}
beforeAutoFlushTests := []Test{
// segID, numOfRow, startTs, endTs, cp.numOfRow, cp.Ts
{1, 1, 100, 123, 0, 100},
{2, 1, 100, 123, 0, 100},
}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, 0, len(flushPacks))
for i, test := range beforeAutoFlushTests {
channel.segMu.Lock()
seg, ok := channel.segments[UniqueID(i+1)]
channel.segMu.Unlock()
assert.True(t, ok)
assert.Equal(t, datapb.SegmentType_New, seg.getType())
assert.Equal(t, partitionID, seg.partitionID)
assert.Equal(t, test.expectedSegID, seg.segmentID)
assert.Equal(t, test.expectedNumOfRows, seg.numRows)
assert.Equal(t, test.expectedStartPosTs, seg.startPos.GetTimestamp())
}
inMsg.insertMessages = nil
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 200}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 234}}
inMsg.dropPartitions = []int64{partitionID}
iMsg = &inMsg
// Triger drop paritition
output := iBNode.Operate([]flowgraph.Msg{iMsg})
fgm := output[0].(*flowGraphMsg)
wg.Add(len(fgm.segmentsToFlush))
t.Log("segments to flush", fgm.segmentsToFlush)
for _, im := range fgm.segmentsToFlush {
// send del done signal
err = fm.flushDelData(nil, im, fgm.endPositions[0])
assert.NoError(t, err)
}
wg.Wait()
assert.Equal(t, 2, len(flushPacks))
assert.Less(t, 0, len(flushPacks[0].insertLogs))
for _, flushPack := range flushPacks {
assert.True(t, flushPack.flushed)
assert.True(t, flushPack.dropped)
}
})
t.Run("drop partition with flush", func(t *testing.T) {
tmp := Params.DataNodeCfg.FlushInsertBufferSize
Params.DataNodeCfg.FlushInsertBufferSize = 4 * 4
defer func() {
Params.DataNodeCfg.FlushInsertBufferSize = tmp
}()
fpMut.Lock()
flushPacks = flushPacks[:0]
fpMut.Unlock()
inMsg := genFlowGraphInsertMsg("datanode-03-test-autoflush")
inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(2)
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = UniqueID(10 + i)
inMsg.insertMessages[i].PartitionID = partitionID
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 300}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 323}}
var iMsg flowgraph.Msg = &inMsg
type Test struct {
expectedSegID UniqueID
expectedNumOfRows int64
expectedStartPosTs Timestamp
expectedEndPosTs Timestamp
expectedCpNumOfRows int64
expectedCpPosTs Timestamp
}
beforeAutoFlushTests := []Test{
// segID, numOfRow, startTs, endTs, cp.numOfRow, cp.Ts
{10, 1, 300, 323, 0, 300},
{11, 1, 300, 323, 0, 300},
}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, 0, len(flushPacks))
for _, test := range beforeAutoFlushTests {
channel.segMu.Lock()
seg, ok := channel.segments[test.expectedSegID]
channel.segMu.Unlock()
assert.True(t, ok)
assert.Equal(t, datapb.SegmentType_New, seg.getType())
assert.Equal(t, partitionID, seg.partitionID)
assert.Equal(t, test.expectedSegID, seg.segmentID)
assert.Equal(t, test.expectedNumOfRows, seg.numRows)
assert.Equal(t, test.expectedStartPosTs, seg.startPos.GetTimestamp())
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 400}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 434}}
inMsg.dropPartitions = []int64{partitionID}
// trigger manual flush
flushChan <- flushMsg{
segmentID: 10,
flushed: true,
}
// trigger auto flush since buffer full
output := iBNode.Operate([]flowgraph.Msg{iMsg})
fgm := output[0].(*flowGraphMsg)
wg.Add(len(fgm.segmentsToFlush))
for _, im := range fgm.segmentsToFlush {
// send del done signal
err = fm.flushDelData(nil, im, fgm.endPositions[0])
assert.NoError(t, err)
}
wg.Wait()
assert.Equal(t, 4, len(flushPacks))
for _, pack := range flushPacks {
assert.True(t, pack.flushed)
assert.True(t, pack.dropped)
syncTasks := node.FillInSyncTasks(fgMsg, nil)
s.Assert().NotEmpty(syncTasks)
s.Assert().Equal(3, len(syncTasks))
for _, task := range syncTasks {
s.Assert().True(task.dropped)
s.Assert().False(task.flushed)
s.Assert().Nil(task.buffer)
}
})
s.Run("auto sync", func() {
segToFlush := []UniqueID{1, 2}
node := &insertBufferNode{
channelName: s.channel.channelName,
channel: s.channel,
flushChan: make(chan flushMsg, 100),
}
buffer := BufferData{
buffer: nil,
size: 2,
limit: 2,
}
node.insertBuffer.Store(UniqueID(1), &buffer)
syncTasks := node.FillInSyncTasks(new(flowGraphMsg), segToFlush)
s.Assert().NotEmpty(syncTasks)
s.Assert().Equal(1, len(syncTasks))
task, ok := syncTasks[UniqueID(1)]
s.Assert().True(ok)
s.Assert().Equal(UniqueID(1), task.segmentID)
s.Assert().True(task.auto)
s.Assert().False(task.flushed)
s.Assert().False(task.dropped)
})
s.Run("drop partition", func() {
fgMsg := flowGraphMsg{dropPartitions: []UniqueID{s.partID}}
node := &insertBufferNode{
channelName: s.channel.channelName,
channel: s.channel,
flushChan: make(chan flushMsg, 100),
}
syncTasks := node.FillInSyncTasks(&fgMsg, nil)
s.Assert().NotEmpty(syncTasks)
s.Assert().Equal(3, len(syncTasks))
for _, task := range syncTasks {
s.Assert().False(task.auto)
s.Assert().True(task.flushed)
s.Assert().True(task.dropped)
}
})
s.Run("manual sync", func() {
flushCh := make(chan flushMsg, 100)
node := &insertBufferNode{
channelName: s.channel.channelName,
channel: s.channel,
flushChan: flushCh,
}
for i := 1; i <= 3; i++ {
msg := flushMsg{
segmentID: UniqueID(i),
collectionID: s.collID,
flushed: i%2 == 0, // segID=2, flushed = true
}
flushCh <- msg
}
syncTasks := node.FillInSyncTasks(new(flowGraphMsg), nil)
s.Assert().NotEmpty(syncTasks)
for segID, task := range syncTasks {
if segID == UniqueID(2) {
s.Assert().True(task.flushed)
} else {
s.Assert().False(task.flushed)
}
s.Assert().False(task.auto)
s.Assert().False(task.dropped)
}
})
s.Run("manual sync over load", func() {
flushCh := make(chan flushMsg, 100)
node := &insertBufferNode{
channelName: s.channel.channelName,
channel: s.channel,
flushChan: flushCh,
}
for i := 1; i <= 100; i++ {
msg := flushMsg{
segmentID: UniqueID(i),
collectionID: s.collID,
flushed: false,
}
if i == 2 {
msg.flushed = true
}
flushCh <- msg
}
syncTasks := node.FillInSyncTasks(new(flowGraphMsg), nil)
s.Assert().NotEmpty(syncTasks)
s.Assert().Equal(10, len(syncTasks)) // 10 is max batch
for segID, task := range syncTasks {
if segID == UniqueID(2) {
s.Assert().True(task.flushed)
} else {
s.Assert().False(task.flushed)
}
s.Assert().False(task.auto)
s.Assert().False(task.dropped)
}
})
}
func TestInsertBufferNodeSuite(t *testing.T) {
suite.Run(t, new(InsertBufferNodeSuit))
}
// CompactedRootCoord has meta info compacted at ts
@ -934,3 +922,36 @@ func TestInsertBufferNode_getTimestampRange(t *testing.T) {
})
}
}
func TestInsertBufferNode_collectSegmentsToSync(t *testing.T) {
tests := []struct {
description string
inFlushMsgNum int
expectedOutNum int
}{
{"batch 1 < maxBatch 10", 1, 1},
{"batch 5 < maxBatch 10", 5, 5},
{"batch 10 = maxBatch 10", 10, 10},
{"batch 20 = maxBatch 10", 20, 10},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
flushCh := make(chan flushMsg, 100)
node := &insertBufferNode{
flushChan: flushCh,
channelName: "channel" + test.description,
}
for i := 0; i < test.inFlushMsgNum; i++ {
flushCh <- flushMsg{
segmentID: UniqueID(i),
flushed: i%2 == 0,
}
}
flushedSegs, staleSegs := node.CollectSegmentsToSync()
assert.Equal(t, test.expectedOutNum, len(flushedSegs)+len(staleSegs))
})
}
}

View File

@ -46,10 +46,10 @@ type flowGraphMsg struct {
timeRange TimeRange
startPositions []*internalpb.MsgPosition
endPositions []*internalpb.MsgPosition
//segmentsToFlush is the signal used by insertBufferNode to notify deleteNode to flush
segmentsToFlush []UniqueID
dropCollection bool
dropPartitions []UniqueID
//segmentsToSync is the signal used by insertBufferNode to notify deleteNode to flush
segmentsToSync []UniqueID
dropCollection bool
dropPartitions []UniqueID
}
func (fgMsg *flowGraphMsg) TimeTick() Timestamp {