Improve codes in insertbuffernode (#8054)

Add error handling in insertbuffernode

See also: #7624, #7684

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/8138/head
XuanYang-cn 2021-09-17 16:27:56 +08:00 committed by GitHub
parent 82d4d19337
commit 6c434b4801
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 131 additions and 64 deletions

View File

@ -23,7 +23,7 @@ import (
"unsafe"
"github.com/golang/protobuf/proto"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
@ -31,9 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -51,6 +49,7 @@ type (
InsertData = storage.InsertData
Blob = storage.Blob
)
type insertBufferNode struct {
BaseNode
channelName string
@ -137,24 +136,29 @@ func (ibNode *insertBufferNode) Name() string {
return "ibNode"
}
func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
func (ibNode *insertBufferNode) Close() {
if ibNode.timeTickStream != nil {
ibNode.timeTickStream.Close()
}
if ibNode.segmentStatisticsStream != nil {
ibNode.segmentStatisticsStream.Close()
}
}
func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
// log.Debug("InsertBufferNode Operating")
if len(in) != 1 {
log.Error("Invalid operate message input in insertBufferNode", zap.Int("input length", len(in)))
// TODO: add error handling
return []Msg{}
}
iMsg, ok := in[0].(*insertMsg)
if !ok {
log.Error("type assertion failed for insertMsg")
// TODO: add error handling
}
if iMsg == nil {
ibNode.timeTickStream.Close()
ibNode.segmentStatisticsStream.Close()
ibNode.Close()
return []Msg{}
}
@ -179,46 +183,21 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
endPositions = append(endPositions, pos)
}
// Updating segment statistics
uniqueSeg := make(map[UniqueID]int64)
for _, msg := range iMsg.insertMessages {
currentSegID := msg.GetSegmentID()
collID := msg.GetCollectionID()
partitionID := msg.GetPartitionID()
if !ibNode.replica.hasSegment(currentSegID, true) {
err := ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(),
startPositions[0], endPositions[0])
if err != nil {
log.Error("add segment wrong", zap.Error(err))
}
}
segNum := uniqueSeg[currentSegID]
uniqueSeg[currentSegID] = segNum + int64(len(msg.RowIDs))
// Updating segment statistics in replica
seg2Upload, err := ibNode.updateSegStatesInReplica(iMsg.insertMessages, startPositions[0], endPositions[0])
if err != nil {
log.Warn("update segment states in Replica wrong", zap.Error(err))
return []Msg{}
}
segToUpdate := make([]UniqueID, 0, len(uniqueSeg))
for id, num := range uniqueSeg {
segToUpdate = append(segToUpdate, id)
err := ibNode.replica.updateStatistics(id, num)
if len(seg2Upload) > 0 {
err := ibNode.uploadMemStates2Coord(seg2Upload)
if err != nil {
log.Error("update Segment Row number wrong", zap.Error(err))
log.Error("upload segment statistics to coord error", zap.Error(err))
}
}
if len(segToUpdate) > 0 {
err := ibNode.updateSegStatistics(segToUpdate)
if err != nil {
log.Error("update segment statistics error", zap.Error(err))
}
}
// iMsg is insertMsg
// 1. iMsg -> buffer
// insert messages -> buffer
for _, msg := range iMsg.insertMessages {
err := ibNode.bufferInsertMsg(iMsg, msg)
if err != nil {
@ -226,6 +205,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
}
// TODO GOOSE: log updated segments' states
if len(iMsg.insertMessages) > 0 {
log.Debug("---insert buffer status---")
var stopSign int = 0
@ -239,9 +219,10 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
}
finishCh := make(chan segmentFlushUnit, len(segToUpdate))
// Auto Flush
finishCh := make(chan segmentFlushUnit, len(seg2Upload))
finishCnt := sync.WaitGroup{}
for _, segToFlush := range segToUpdate {
for _, segToFlush := range seg2Upload {
// If full, auto flush
if ibNode.insertBuffer.full(segToFlush) {
log.Debug(". Insert Buffer full, auto flushing ",
@ -281,7 +262,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
}
// iMsg is Flush() msg from datacoord
// Manul Flush
select {
case fmsg := <-ibNode.flushChan:
currentSegID := fmsg.segmentID
@ -362,6 +343,36 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
return nil
}
func (ibNode *insertBufferNode) updateSegStatesInReplica(insertMsgs []*msgstream.InsertMsg, startPos, endPos *internalpb.MsgPosition) (seg2Upload []UniqueID, err error) {
uniqueSeg := make(map[UniqueID]int64)
for _, msg := range insertMsgs {
currentSegID := msg.GetSegmentID()
collID := msg.GetCollectionID()
partitionID := msg.GetPartitionID()
if !ibNode.replica.hasSegment(currentSegID, true) {
err = ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(),
startPos, endPos)
if err != nil {
log.Warn("add segment wrong", zap.Int64("Seg ID", currentSegID), zap.Error(err))
return
}
}
segNum := uniqueSeg[currentSegID]
uniqueSeg[currentSegID] = segNum + int64(len(msg.RowIDs))
}
seg2Upload = make([]UniqueID, 0, len(uniqueSeg))
for id, num := range uniqueSeg {
seg2Upload = append(seg2Upload, id)
ibNode.replica.updateStatistics(id, num)
}
return
}
// bufferInsertMsg put InsertMsg into buffer
// 1.1 fetch related schema from replica
// 1.2 Get buffer data and put data into each field buffer
@ -384,7 +395,6 @@ func (ibNode *insertBufferNode) bufferInsertMsg(iMsg *insertMsg, msg *msgstream.
// 1.1 Get Collection Schema
collSchema, err := ibNode.replica.getCollectionSchema(collectionID, msg.EndTs())
if err != nil {
// GOOSE TODO add error handler
log.Error("Get schema wrong:", zap.Error(err))
return err
}
@ -414,7 +424,6 @@ func (ibNode *insertBufferNode) bufferInsertMsg(iMsg *insertMsg, msg *msgstream.
if dim <= 0 {
log.Error("invalid dim")
continue
// TODO: add error handling
}
if _, ok := idata.Data[field.FieldID]; !ok {
@ -453,7 +462,7 @@ func (ibNode *insertBufferNode) bufferInsertMsg(iMsg *insertMsg, msg *msgstream.
}
if dim <= 0 {
log.Error("invalid dim")
// TODO: add error handling
continue
}
if _, ok := idata.Data[field.FieldID]; !ok {
@ -759,7 +768,7 @@ func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
return ibNode.timeTickStream.Produce(&msgPack)
}
func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error {
func (ibNode *insertBufferNode) uploadMemStates2Coord(segIDs []UniqueID) error {
log.Debug("Updating segments statistics...")
statsUpdates := make([]*internalpb.SegmentStatisticsUpdates, 0, len(segIDs))
for _, segID := range segIDs {

View File

@ -111,7 +111,35 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
assert.Error(t, err)
}
type mockMsg struct{}
func (*mockMsg) TimeTick() Timestamp {
return 0
}
func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
t.Run("Test iBNode Operate invalid Msg", func(te *testing.T) {
invalidInTests := []struct {
in []Msg
description string
}{
{[]Msg{},
"Invalid input length == 0"},
{[]Msg{&insertMsg{}, &insertMsg{}, &insertMsg{}},
"Invalid input length == 3"},
{[]Msg{&mockMsg{}},
"Invalid input length == 1 but input message is not insertMsg"},
}
for _, test := range invalidInTests {
te.Run(test.description, func(t0 *testing.T) {
ibn := &insertBufferNode{}
rt := ibn.Operate(test.in)
assert.Empty(t0, rt)
})
}
})
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@ -681,3 +709,37 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
assert.NotNil(t, err)
}
}
func TestInsertBufferNode_updateSegStatesInReplica(te *testing.T) {
invalideTests := []struct {
replicaCollID UniqueID
inCollID UniqueID
segID UniqueID
description string
}{
{1, 9, 100, "collectionID mismatch"},
}
for _, test := range invalideTests {
ibNode := &insertBufferNode{
replica: newReplica(&RootCoordFactory{}, test.replicaCollID),
}
im := []*msgstream.InsertMsg{
{
InsertRequest: internalpb.InsertRequest{
CollectionID: test.inCollID,
SegmentID: test.segID,
},
},
}
seg, err := ibNode.updateSegStatesInReplica(im, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
assert.Error(te, err)
assert.Empty(te, seg)
}
}

View File

@ -49,7 +49,7 @@ type Replica interface {
updateSegmentPKRange(segID UniqueID, rowIDs []int64)
hasSegment(segID UniqueID, countFlushed bool) bool
updateStatistics(segID UniqueID, numRows int64) error
updateStatistics(segID UniqueID, numRows int64)
getSegmentStatisticsUpdates(segID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
segmentFlushed(segID UniqueID)
}
@ -362,7 +362,7 @@ func (replica *SegmentReplica) hasSegment(segID UniqueID, countFlushed bool) boo
}
// updateStatistics updates the number of rows of a segment in replica.
func (replica *SegmentReplica) updateStatistics(segID UniqueID, numRows int64) error {
func (replica *SegmentReplica) updateStatistics(segID UniqueID, numRows int64) {
replica.segMu.Lock()
defer replica.segMu.Unlock()
@ -370,16 +370,16 @@ func (replica *SegmentReplica) updateStatistics(segID UniqueID, numRows int64) e
if seg, ok := replica.newSegments[segID]; ok {
seg.memorySize = 0
seg.numRows += numRows
return nil
return
}
if seg, ok := replica.normalSegments[segID]; ok {
seg.memorySize = 0
seg.numRows += numRows
return nil
return
}
return fmt.Errorf("There's no segment %v", segID)
log.Warn("update segment num row not exist", zap.Int64("segID", segID))
}
// getSegmentStatisticsUpdates gives current segment's statistics updates.

View File

@ -416,9 +416,8 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) {
sr.flushedSegments[test.flushedSegID] = &Segment{}
}
err := sr.updateStatistics(test.inSegID, test.inNumRows)
sr.updateStatistics(test.inSegID, test.inNumRows)
if test.isvalidCase {
assert.NoError(t, err)
updates, err := sr.getSegmentStatisticsUpdates(test.inSegID)
assert.NoError(t, err)
@ -427,7 +426,6 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) {
sr.updateSegmentCheckPoint(10000)
} else {
assert.Error(t, err)
updates, err := sr.getSegmentStatisticsUpdates(test.inSegID)
assert.Error(t, err)
assert.Nil(t, updates)
@ -501,8 +499,7 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) {
assert.True(t, seg.isNew.Load().(bool))
assert.False(t, seg.isFlushed.Load().(bool))
err = replica.updateStatistics(0, 10)
assert.NoError(t, err)
replica.updateStatistics(0, 10)
assert.Equal(t, int64(10), seg.numRows)
cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)}
@ -527,8 +524,7 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) {
err = replica.addNormalSegment(1, 100000, 2, "invalid", int64(0), &segmentCheckPoint{})
assert.Error(t, err)
err = replica.updateStatistics(1, 10)
assert.NoError(t, err)
replica.updateStatistics(1, 10)
assert.Equal(t, int64(20), seg.numRows)
segPos := replica.listNewSegmentsStartPositions()