Add BufferData in insertbufferNode (#8138)

Optimize bufferInsertMsg param to avoid a second transfer
of endposition

See also: #8058, #7741

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/8207/head
XuanYang-cn 2021-09-18 14:25:50 +08:00 committed by GitHub
parent 2847c0a135
commit 46c72c57c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 68 additions and 17 deletions

View File

@ -18,4 +18,6 @@ dataNode:
flush: flush:
# max buffer size to flush # max buffer size to flush
insertBufSize: 32000 # number of rows insertBufSize: 32000 # GOOSE TODO: to delete
# insertBufSize: 16 # MB GOOSE TODO: to enable

View File

@ -54,10 +54,12 @@ type insertBufferNode struct {
BaseNode BaseNode
channelName string channelName string
insertBuffer *insertBuffer insertBuffer *insertBuffer
replica Replica // insertBuffer map[UniqueID]*BufferData // SegmentID to BufferData
idAllocator allocatorInterface replica Replica
flushMap sync.Map idAllocator allocatorInterface
flushChan <-chan *flushMsg
flushMap sync.Map
flushChan <-chan *flushMsg
minIOKV kv.BaseKV minIOKV kv.BaseKV
@ -81,6 +83,22 @@ type segmentFlushUnit struct {
flushed bool flushed bool
} }
type BufferData struct {
buffer *InsertData
size int64
limit int64 // Num of rows
}
func newBufferData(dimension int64) (*BufferData, error) {
if dimension == 0 {
return nil, errors.New("Invalid dimension")
}
limit := Params.FlushInsertBufferSize * (1 << 18) / dimension
return &BufferData{&InsertData{}, 0, limit}, nil
}
type insertBuffer struct { type insertBuffer struct {
insertData map[UniqueID]*InsertData // SegmentID to InsertData insertData map[UniqueID]*InsertData // SegmentID to InsertData
maxSize int64 maxSize int64
@ -199,7 +217,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
// insert messages -> buffer // insert messages -> buffer
for _, msg := range iMsg.insertMessages { for _, msg := range iMsg.insertMessages {
err := ibNode.bufferInsertMsg(iMsg, msg) err := ibNode.bufferInsertMsg(msg, endPositions[0])
if err != nil { if err != nil {
log.Warn("msg to buffer failed", zap.Error(err)) log.Warn("msg to buffer failed", zap.Error(err))
} }
@ -383,7 +401,7 @@ func (ibNode *insertBufferNode) updateSegStatesInReplica(insertMsgs []*msgstream
// 1.2 Get buffer data and put data into each field buffer // 1.2 Get buffer data and put data into each field buffer
// 1.3 Put back into buffer // 1.3 Put back into buffer
// 1.4 Update related statistics // 1.4 Update related statistics
func (ibNode *insertBufferNode) bufferInsertMsg(iMsg *insertMsg, msg *msgstream.InsertMsg) error { func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos *internalpb.MsgPosition) error {
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
return errors.New("misaligned messages detected") return errors.New("misaligned messages detected")
} }
@ -625,13 +643,8 @@ func (ibNode *insertBufferNode) bufferInsertMsg(iMsg *insertMsg, msg *msgstream.
ibNode.insertBuffer.insertData[currentSegID] = idata ibNode.insertBuffer.insertData[currentSegID] = idata
// store current endPositions as Segment->EndPostion // store current endPositions as Segment->EndPostion
endPositions := make([]*internalpb.MsgPosition, 0, len(iMsg.endPositions)) ibNode.replica.updateSegmentEndPosition(currentSegID, endPos)
for idx := range iMsg.endPositions {
pos := proto.Clone(iMsg.endPositions[idx]).(*internalpb.MsgPosition)
pos.ChannelName = ibNode.channelName
endPositions = append(endPositions, pos)
}
ibNode.replica.updateSegmentEndPosition(currentSegID, endPositions[0])
// update segment pk filter // update segment pk filter
ibNode.replica.updateSegmentPKRange(currentSegID, msg.GetRowIDs()) ibNode.replica.updateSegmentPKRange(currentSegID, msg.GetRowIDs())
return nil return nil

View File

@ -692,20 +692,20 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
inMsg := genInsertMsg(insertChannelName) inMsg := genInsertMsg(insertChannelName)
for _, msg := range inMsg.insertMessages { for _, msg := range inMsg.insertMessages {
msg.EndTimestamp = 101 // ts valid msg.EndTimestamp = 101 // ts valid
err = iBNode.bufferInsertMsg(&inMsg, msg) err = iBNode.bufferInsertMsg(msg, &internalpb.MsgPosition{})
assert.Nil(t, err) assert.Nil(t, err)
} }
for _, msg := range inMsg.insertMessages { for _, msg := range inMsg.insertMessages {
msg.EndTimestamp = 99 // ts invalid msg.EndTimestamp = 99 // ts invalid
err = iBNode.bufferInsertMsg(&inMsg, msg) err = iBNode.bufferInsertMsg(msg, &internalpb.MsgPosition{})
assert.NotNil(t, err) assert.NotNil(t, err)
} }
for _, msg := range inMsg.insertMessages { for _, msg := range inMsg.insertMessages {
msg.EndTimestamp = 101 // ts valid msg.EndTimestamp = 101 // ts valid
msg.RowIDs = []int64{} //misaligned data msg.RowIDs = []int64{} //misaligned data
err = iBNode.bufferInsertMsg(&inMsg, msg) err = iBNode.bufferInsertMsg(msg, &internalpb.MsgPosition{})
assert.NotNil(t, err) assert.NotNil(t, err)
} }
} }
@ -743,3 +743,39 @@ func TestInsertBufferNode_updateSegStatesInReplica(te *testing.T) {
} }
} }
func TestInsertBufferNode_BufferData(te *testing.T) {
Params.FlushInsertBufferSize = 16
tests := []struct {
isValid bool
indim int64
expectedLimit int64
description string
}{
{true, 1, 4194304, "Smallest of the DIM"},
{true, 128, 32768, "Normal DIM"},
{true, 32768, 128, "Largest DIM"},
{false, 0, 0, "Illegal DIM"},
}
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
idata, err := newBufferData(test.indim)
if test.isValid {
assert.NoError(t, err)
assert.NotNil(t, idata)
assert.Equal(t, test.expectedLimit, idata.limit)
assert.Zero(t, idata.size)
} else {
assert.Error(t, err)
assert.Nil(t, idata)
}
})
}
}