Merge auto manual flush with same segment id (#10550)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/10578/head
congqixia 2021-10-25 18:03:42 +08:00 committed by GitHub
parent bd90a3831c
commit 2713988ff2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 136 additions and 100 deletions

View File

@ -218,55 +218,74 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
segmentsToFlush := make([]UniqueID, 0, len(seg2Upload)+1) //auto flush number + possible manual flush
type flushTask struct {
buffer *BufferData
segmentID UniqueID
flushed bool
}
flushTaskList := make([]flushTask, 0, len(seg2Upload)+1)
// Auto Flush
for _, segToFlush := range seg2Upload {
// If full, auto flush
if bd, ok := ibNode.insertBuffer.Load(segToFlush); ok && bd.(*BufferData).effectiveCap() <= 0 {
log.Warn("Auto flush", zap.Int64("segment id", segToFlush))
ibuffer := bd.(*BufferData)
err := ibNode.flushManager.flushBufferData(ibuffer, segToFlush, false, endPositions[0])
if err != nil {
log.Warn("Failed to flushBufferData", zap.Error(err))
} else {
segmentsToFlush = append(segmentsToFlush, segToFlush)
ibNode.insertBuffer.Delete(segToFlush)
}
flushTaskList = append(flushTaskList, flushTask{
buffer: ibuffer,
segmentID: segToFlush,
flushed: false,
})
}
}
// Manual Flush
select {
case fmsg := <-ibNode.flushChan:
currentSegID := fmsg.segmentID
log.Debug(". Receiving flush message",
zap.Int64("segmentID", currentSegID),
zap.Int64("segmentID", fmsg.segmentID),
zap.Int64("collectionID", fmsg.collectionID),
)
bd, ok := ibNode.insertBuffer.Load(currentSegID)
var err error
var buf *BufferData
if ok {
buf = bd.(*BufferData)
}
if buf == nil || buf.size <= 0 { // Buffer empty
log.Debug(".. Buffer empty ...")
err = ibNode.flushManager.flushBufferData(nil, currentSegID, fmsg.flushed, endPositions[0])
} else { // Buffer not empty
err = ibNode.flushManager.flushBufferData(buf, currentSegID, fmsg.flushed, endPositions[0])
}
if err != nil {
log.Warn("failed to manual invoke flushBufferData", zap.Error(err))
} else {
segmentsToFlush = append(segmentsToFlush, currentSegID)
if fmsg.flushed {
ibNode.replica.segmentFlushed(currentSegID)
// merging auto&manual flush segment same segment id
dup := false
for i, task := range flushTaskList {
if task.segmentID == fmsg.segmentID {
flushTaskList[i].flushed = fmsg.flushed
dup = true
break
}
ibNode.insertBuffer.Delete(currentSegID)
}
// if merged, skip load buffer and create task
if !dup {
currentSegID := fmsg.segmentID
bd, ok := ibNode.insertBuffer.Load(currentSegID)
var buf *BufferData
if ok {
buf = bd.(*BufferData)
}
flushTaskList = append(flushTaskList, flushTask{
buffer: buf,
segmentID: currentSegID,
flushed: fmsg.flushed,
})
}
default:
}
for _, task := range flushTaskList {
err := ibNode.flushManager.flushBufferData(task.buffer, task.segmentID, task.flushed, endPositions[0])
if err != nil {
log.Warn("failed to invoke flushBufferData", zap.Error(err))
} else {
segmentsToFlush = append(segmentsToFlush, task.segmentID)
if task.flushed {
ibNode.replica.segmentFlushed(task.segmentID)
}
ibNode.insertBuffer.Delete(task.segmentID)
}
}
if err := ibNode.writeHardTimeTick(fgMsg.timeRange.timestampMax); err != nil {
log.Error("send hard time tick into pulsar channel failed", zap.Error(err))
}

View File

@ -380,11 +380,14 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
assert.Nil(t, err)
flushPacks := []*segmentFlushPack{}
fpMut := sync.Mutex{}
memkv := memkv.NewMemoryKV()
wg := sync.WaitGroup{}
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, colRep, func(pack *segmentFlushPack) error {
fpMut.Lock()
flushPacks = append(flushPacks, pack)
fpMut.Unlock()
colRep.listNewSegmentsStartPositions()
colRep.listSegmentsCheckPoints()
wg.Done()
@ -442,7 +445,9 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
assert.Equal(t, 0, len(flushPacks))
for i, test := range beforeAutoFlushTests {
colRep.segMu.Lock()
seg, ok := colRep.newSegments[UniqueID(i+1)]
colRep.segMu.Unlock()
assert.True(t, ok)
assert.Equal(t, test.expectedSegID, seg.segmentID)
assert.Equal(t, test.expectedNumOfRows, seg.numRows)
@ -508,77 +513,89 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
})
t.Run("Auto with manual flush", func(t *testing.T) {
t.Skipf("Skip, fix later")
/*
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = 1
tmp := Params.FlushInsertBufferSize
Params.FlushInsertBufferSize = 4 * 4
defer func() {
Params.FlushInsertBufferSize = tmp
}()
fpMut.Lock()
flushPacks = flushPacks[:0]
fpMut.Unlock()
inMsg := GenFlowGraphInsertMsg("datanode-03-test-autoflush")
inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(2)
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = UniqueID(10 + i)
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 300}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 323}}
var iMsg flowgraph.Msg = &inMsg
type Test struct {
expectedSegID UniqueID
expectedNumOfRows int64
expectedStartPosTs Timestamp
expectedEndPosTs Timestamp
expectedCpNumOfRows int64
expectedCpPosTs Timestamp
}
beforeAutoFlushTests := []Test{
// segID, numOfRow, startTs, endTs, cp.numOfRow, cp.Ts
{10, 1, 300, 323, 0, 300},
{11, 1, 300, 323, 0, 300},
}
iBNode.Operate([]flowgraph.Msg{iMsg})
require.Equal(t, 2, len(colRep.newSegments))
require.Equal(t, 3, len(colRep.normalSegments))
assert.Equal(t, 0, len(flushPacks))
for _, test := range beforeAutoFlushTests {
colRep.segMu.Lock()
seg, ok := colRep.newSegments[test.expectedSegID]
colRep.segMu.Unlock()
assert.True(t, ok)
assert.Equal(t, test.expectedSegID, seg.segmentID)
assert.Equal(t, test.expectedNumOfRows, seg.numRows)
assert.Equal(t, test.expectedStartPosTs, seg.startPos.GetTimestamp())
assert.Equal(t, test.expectedCpNumOfRows, seg.checkPoint.numRows)
assert.Equal(t, test.expectedCpPosTs, seg.checkPoint.pos.GetTimestamp())
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 400}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 434}}
// trigger manual flush
flushChan <- flushMsg{
segmentID: 10,
flushed: true,
}
// trigger auto flush since buffer full
output := iBNode.Operate([]flowgraph.Msg{iMsg})
fgm := output[0].(*flowGraphMsg)
wg.Add(len(fgm.segmentsToFlush))
for _, im := range fgm.segmentsToFlush {
// send del done signal
fm.flushDelData(nil, im, fgm.endPositions[0])
}
wg.Wait()
require.Equal(t, 0, len(colRep.newSegments))
require.Equal(t, 4, len(colRep.normalSegments))
require.Equal(t, 1, len(colRep.flushedSegments))
assert.Equal(t, 2, len(flushPacks))
for _, pack := range flushPacks {
if pack.segmentID == 10 {
assert.Equal(t, true, pack.flushed)
} else {
assert.Equal(t, false, pack.flushed)
}
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 234}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, len(flushUnit), 2)
assert.Equal(t, flushUnit[1].segID, int64(1))
assert.Equal(t, len(flushUnit[1].checkPoint), 3)
assert.Equal(t, flushUnit[1].checkPoint[1].numRows, int64(50+16000+100+32000))
assert.Equal(t, flushUnit[1].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[1].checkPoint[3].numRows, int64(0))
assert.Equal(t, flushUnit[1].checkPoint[1].pos.Timestamp, Timestamp(345))
assert.Equal(t, flushUnit[1].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[1].checkPoint[3].pos.Timestamp, Timestamp(123))
assert.False(t, flushUnit[1].flushed)
assert.Greater(t, len(flushUnit[1].field2Path), 0)
// assert.Equal(t, len(iBNode.insertBuffer.insertData), 1)
// assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000))
flushChan <- flushMsg{
msgID: 3,
timestamp: 456,
segmentID: UniqueID(1),
collectionID: UniqueID(1),
}
inMsg.insertMessages = []*msgstream.InsertMsg{}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 456}}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, len(flushUnit), 3)
assert.Equal(t, flushUnit[2].segID, int64(1))
assert.Equal(t, len(flushUnit[2].checkPoint), 3)
assert.Equal(t, flushUnit[2].checkPoint[1].numRows, int64(50+16000+100+32000))
assert.Equal(t, flushUnit[2].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[2].checkPoint[3].numRows, int64(0))
assert.Equal(t, flushUnit[2].checkPoint[1].pos.Timestamp, Timestamp(345))
assert.Equal(t, flushUnit[2].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[2].checkPoint[3].pos.Timestamp, Timestamp(123))
assert.Equal(t, len(flushUnit[2].field2Path), 0)
assert.NotNil(t, flushUnit[2].field2Path)
assert.True(t, flushUnit[2].flushed)
// assert.Equal(t, len(iBNode.insertBuffer.insertData), 1)
// assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000))
flushChan <- flushMsg{
msgID: 4,
timestamp: 567,
segmentID: UniqueID(3),
collectionID: UniqueID(3),
}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, len(flushUnit), 4)
assert.Equal(t, flushUnit[3].segID, int64(3))
assert.Equal(t, len(flushUnit[3].checkPoint), 2)
assert.Equal(t, flushUnit[3].checkPoint[3].numRows, int64(50+16000))
assert.Equal(t, flushUnit[3].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[3].checkPoint[3].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[3].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Greater(t, len(flushUnit[3].field2Path), 0)
assert.NotNil(t, flushUnit[3].field2Path)
assert.True(t, flushUnit[3].flushed)
// assert.Equal(t, len(iBNode.insertBuffer.insertData), 0)
*/
})
}

View File

@ -434,8 +434,8 @@ func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*dat
// listNewSegmentsStartPositions gets all *New Segments* start positions and
// transfer segments states from *New* to *Normal*.
func (replica *SegmentReplica) listNewSegmentsStartPositions() []*datapb.SegmentStartPosition {
replica.segMu.RLock()
defer replica.segMu.RUnlock()
replica.segMu.Lock()
defer replica.segMu.Unlock()
result := make([]*datapb.SegmentStartPosition, 0, len(replica.newSegments))
for id, seg := range replica.newSegments {