Refactor insertBufferNode and add unit tests (#7621)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/7631/head
congqixia 2021-09-09 15:00:00 +08:00 committed by GitHub
parent dbd2a02462
commit 69794fd32d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 403 additions and 271 deletions

View File

@ -15,6 +15,7 @@ import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"path"
"strconv"
@ -217,278 +218,10 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// iMsg is insertMsg
// 1. iMsg -> buffer
for _, msg := range iMsg.insertMessages {
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
log.Error("misaligned messages detected")
continue
}
currentSegID := msg.GetSegmentID()
collectionID := msg.GetCollectionID()
idata, ok := ibNode.insertBuffer.insertData[currentSegID]
if !ok {
idata = &InsertData{
Data: make(map[UniqueID]storage.FieldData),
}
}
// 1.1 Get Collection Schema
collSchema, err := ibNode.replica.getCollectionSchema(collectionID, msg.EndTs())
err := ibNode.bufferInsertMsg(iMsg, msg)
if err != nil {
// GOOSE TODO add error handler
log.Error("Get schema wrong:", zap.Error(err))
continue
log.Warn("msg to buffer failed", zap.Error(err))
}
// 1.2 Get Fields
var pos int = 0 // Record position of blob
var fieldIDs []int64
var fieldTypes []schemapb.DataType
for _, field := range collSchema.Fields {
fieldIDs = append(fieldIDs, field.FieldID)
fieldTypes = append(fieldTypes, field.DataType)
}
for _, field := range collSchema.Fields {
switch field.DataType {
case schemapb.DataType_FloatVector:
var dim int
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err = strconv.Atoi(t.Value)
if err != nil {
log.Error("strconv wrong on get dim", zap.Error(err))
}
break
}
}
if dim <= 0 {
log.Error("invalid dim")
continue
// TODO: add error handling
}
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.FloatVectorFieldData{
NumRows: make([]int64, 0, 1),
Data: make([]float32, 0),
Dim: dim,
}
}
fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData)
var offset int
for _, blob := range msg.RowData {
offset = 0
for j := 0; j < dim; j++ {
var v float32
buf := bytes.NewBuffer(blob.GetValue()[pos+offset:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Error("binary.read float32 wrong", zap.Error(err))
}
fieldData.Data = append(fieldData.Data, v)
offset += int(unsafe.Sizeof(*(&v)))
}
}
pos += offset
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_BinaryVector:
var dim int
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err = strconv.Atoi(t.Value)
if err != nil {
log.Error("strconv wrong")
}
break
}
}
if dim <= 0 {
log.Error("invalid dim")
// TODO: add error handling
}
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{
NumRows: make([]int64, 0, 1),
Data: make([]byte, 0),
Dim: dim,
}
}
fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)
var offset int
for _, blob := range msg.RowData {
bv := blob.GetValue()[pos : pos+(dim/8)]
fieldData.Data = append(fieldData.Data, bv...)
offset = len(bv)
}
pos += offset
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Bool:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.BoolFieldData{
NumRows: make([]int64, 0, 1),
Data: make([]bool, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
var v bool
for _, blob := range msg.RowData {
buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Error("binary.Read bool wrong", zap.Error(err))
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int8:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int8FieldData{
NumRows: make([]int64, 0, 1),
Data: make([]int8, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
var v int8
for _, blob := range msg.RowData {
buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Error("binary.Read int8 wrong", zap.Error(err))
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int16:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int16FieldData{
NumRows: make([]int64, 0, 1),
Data: make([]int16, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
var v int16
for _, blob := range msg.RowData {
buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Error("binary.Read int16 wrong", zap.Error(err))
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int32:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int32FieldData{
NumRows: make([]int64, 0, 1),
Data: make([]int32, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
var v int32
for _, blob := range msg.RowData {
buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Error("binary.Read int64 wrong", zap.Error(err))
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int64:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int64FieldData{
NumRows: make([]int64, 0, 1),
Data: make([]int64, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData)
switch field.FieldID {
case 0: // rowIDs
fieldData.Data = append(fieldData.Data, msg.RowIDs...)
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case 1: // Timestamps
for _, ts := range msg.Timestamps {
fieldData.Data = append(fieldData.Data, int64(ts))
}
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
default:
var v int64
for _, blob := range msg.RowData {
buf := bytes.NewBuffer(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Error("binary.Read int64 wrong", zap.Error(err))
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
}
case schemapb.DataType_Float:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.FloatFieldData{
NumRows: make([]int64, 0, 1),
Data: make([]float32, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
var v float32
for _, blob := range msg.RowData {
buf := bytes.NewBuffer(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Error("binary.Read float32 wrong", zap.Error(err))
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Double:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.DoubleFieldData{
NumRows: make([]int64, 0, 1),
Data: make([]float64, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
var v float64
for _, blob := range msg.RowData {
buf := bytes.NewBuffer(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Error("binary.Read float64 wrong", zap.Error(err))
}
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
}
}
// 1.3 store in buffer
ibNode.insertBuffer.insertData[currentSegID] = idata
// store current endPositions as Segment->EndPostion
ibNode.replica.updateSegmentEndPosition(currentSegID, iMsg.endPositions[0])
// update segment pk filter
ibNode.replica.updateSegmentPKRange(currentSegID, msg.GetRowIDs())
}
if len(iMsg.insertMessages) > 0 {
@ -627,6 +360,269 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
return nil
}
// bufferInsertMsg put InsertMsg into buffer
// 1.1 fetch related schema from replica
// 1.2 Get buffer data and put data into each field buffer
// 1.3 Put back into buffer
// 1.4 Update related statistics
func (ibNode *insertBufferNode) bufferInsertMsg(iMsg *insertMsg, msg *msgstream.InsertMsg) error {
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
return errors.New("misaligned messages detected")
}
currentSegID := msg.GetSegmentID()
collectionID := msg.GetCollectionID()
idata, ok := ibNode.insertBuffer.insertData[currentSegID]
if !ok {
idata = &InsertData{
Data: make(map[UniqueID]storage.FieldData),
}
}
// 1.1 Get Collection Schema
collSchema, err := ibNode.replica.getCollectionSchema(collectionID, msg.EndTs())
if err != nil {
// GOOSE TODO add error handler
log.Error("Get schema wrong:", zap.Error(err))
return err
}
// 1.2 Get Fields
var pos int = 0 // Record position of blob
var fieldIDs []int64
var fieldTypes []schemapb.DataType
for _, field := range collSchema.Fields {
fieldIDs = append(fieldIDs, field.FieldID)
fieldTypes = append(fieldTypes, field.DataType)
}
for _, field := range collSchema.Fields {
switch field.DataType {
case schemapb.DataType_FloatVector:
var dim int
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err = strconv.Atoi(t.Value)
if err != nil {
log.Error("strconv wrong on get dim", zap.Error(err))
}
break
}
}
if dim <= 0 {
log.Error("invalid dim")
continue
// TODO: add error handling
}
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.FloatVectorFieldData{
NumRows: make([]int64, 0, 1),
Data: make([]float32, 0),
Dim: dim,
}
}
fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData)
var offset int
for _, blob := range msg.RowData {
offset = 0
for j := 0; j < dim; j++ {
var v float32
readBinary(blob.GetValue()[pos+offset:], &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
offset += int(unsafe.Sizeof(*(&v)))
}
}
pos += offset
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_BinaryVector:
var dim int
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err = strconv.Atoi(t.Value)
if err != nil {
log.Error("strconv wrong")
}
break
}
}
if dim <= 0 {
log.Error("invalid dim")
// TODO: add error handling
}
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{
NumRows: make([]int64, 0, 1),
Data: make([]byte, 0),
Dim: dim,
}
}
fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)
var offset int
for _, blob := range msg.RowData {
bv := blob.GetValue()[pos : pos+(dim/8)]
fieldData.Data = append(fieldData.Data, bv...)
offset = len(bv)
}
pos += offset
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Bool:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.BoolFieldData{
NumRows: make([]int64, 0, 1),
Data: make([]bool, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
var v bool
for _, blob := range msg.RowData {
readBinary(blob.GetValue()[pos:], &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int8:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int8FieldData{
NumRows: make([]int64, 0, 1),
Data: make([]int8, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
var v int8
for _, blob := range msg.RowData {
readBinary(blob.GetValue()[pos:], &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int16:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int16FieldData{
NumRows: make([]int64, 0, 1),
Data: make([]int16, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
var v int16
for _, blob := range msg.RowData {
readBinary(blob.GetValue()[pos:], &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int32:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int32FieldData{
NumRows: make([]int64, 0, 1),
Data: make([]int32, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
var v int32
for _, blob := range msg.RowData {
readBinary(blob.GetValue()[pos:], &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Int64:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int64FieldData{
NumRows: make([]int64, 0, 1),
Data: make([]int64, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData)
switch field.FieldID {
case 0: // rowIDs
fieldData.Data = append(fieldData.Data, msg.RowIDs...)
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case 1: // Timestamps
for _, ts := range msg.Timestamps {
fieldData.Data = append(fieldData.Data, int64(ts))
}
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
default:
var v int64
for _, blob := range msg.RowData {
readBinary(blob.GetValue()[pos:], &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
}
case schemapb.DataType_Float:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.FloatFieldData{
NumRows: make([]int64, 0, 1),
Data: make([]float32, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
var v float32
for _, blob := range msg.RowData {
readBinary(blob.GetValue()[pos:], &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
case schemapb.DataType_Double:
if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.DoubleFieldData{
NumRows: make([]int64, 0, 1),
Data: make([]float64, 0),
}
}
fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
var v float64
for _, blob := range msg.RowData {
readBinary(blob.GetValue()[pos:], &v, field.DataType)
fieldData.Data = append(fieldData.Data, v)
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData)))
}
}
// 1.3 store in buffer
ibNode.insertBuffer.insertData[currentSegID] = idata
// store current endPositions as Segment->EndPostion
ibNode.replica.updateSegmentEndPosition(currentSegID, iMsg.endPositions[0])
// update segment pk filter
ibNode.replica.updateSegmentPKRange(currentSegID, msg.GetRowIDs())
return nil
}
func readBinary(data []byte, receiver interface{}, dataType schemapb.DataType) {
buf := bytes.NewReader(data)
err := binary.Read(buf, binary.LittleEndian, receiver)
if err != nil {
log.Error("binary.Read failed", zap.Any("data type", dataType), zap.Error(err))
}
}
func flushSegment(
collMeta *etcdpb.CollectionMeta,
segID, partitionID, collID UniqueID,

View File

@ -32,8 +32,10 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
@ -51,7 +53,7 @@ func (f *CDFMsFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, e
return f.Factory.NewMsgStream(ctx)
}
func TestFLowGraphInsertBufferNodeCreate(t *testing.T) {
func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@ -560,3 +562,137 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
})
}
// CompactedRootCoord has meta info compacted at ts
type CompactedRootCoord struct {
types.RootCoord
compactTs Timestamp
}
func (m *CompactedRootCoord) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
if in.GetTimeStamp() <= m.compactTs {
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "meta compacted",
},
}, nil
}
return m.RootCoord.DescribeCollection(ctx, in)
}
func TestInsertBufferNode_getCollMetaBySegID(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
insertChannelName := "datanode-01-test-flowgraphinsertbuffernode-operate"
testPath := "/test/datanode/root/meta"
err := clearEtcd(testPath)
require.NoError(t, err)
Params.MetaRootPath = testPath
Factory := &MetaFactory{}
collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
rcf := &RootCoordFactory{}
mockRootCoord := &CompactedRootCoord{
RootCoord: rcf,
compactTs: 100,
}
replica := newReplica(mockRootCoord, collMeta.ID)
err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
require.NoError(t, err)
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarAddress,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
saveBinlog := func(fu *segmentFlushUnit) error {
t.Log(fu)
return nil
}
flushChan := make(chan *flushMsg, 100)
iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string")
require.NoError(t, err)
meta, err := iBNode.getCollMetabySegID(1, 101)
assert.Nil(t, err)
assert.Equal(t, collMeta.ID, meta.ID)
_, err = iBNode.getCollMetabySegID(2, 101)
assert.NotNil(t, err)
meta, err = iBNode.getCollMetabySegID(1, 99)
assert.NotNil(t, err)
}
func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
insertChannelName := "datanode-01-test-flowgraphinsertbuffernode-operate"
testPath := "/test/datanode/root/meta"
err := clearEtcd(testPath)
require.NoError(t, err)
Params.MetaRootPath = testPath
Factory := &MetaFactory{}
collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
rcf := &RootCoordFactory{}
mockRootCoord := &CompactedRootCoord{
RootCoord: rcf,
compactTs: 100,
}
replica := newReplica(mockRootCoord, collMeta.ID)
err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
require.NoError(t, err)
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarAddress,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
saveBinlog := func(fu *segmentFlushUnit) error {
t.Log(fu)
return nil
}
flushChan := make(chan *flushMsg, 100)
iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string")
require.NoError(t, err)
inMsg := genInsertMsg(insertChannelName)
for _, msg := range inMsg.insertMessages {
msg.EndTimestamp = 101 // ts valid
err = iBNode.bufferInsertMsg(&inMsg, msg)
assert.Nil(t, err)
}
for _, msg := range inMsg.insertMessages {
msg.EndTimestamp = 99 // ts invalid
err = iBNode.bufferInsertMsg(&inMsg, msg)
assert.NotNil(t, err)
}
for _, msg := range inMsg.insertMessages {
msg.EndTimestamp = 101 // ts valid
msg.RowIDs = []int64{} //misaligned data
err = iBNode.bufferInsertMsg(&inMsg, msg)
assert.NotNil(t, err)
}
}