Add ut in datasync service to 88% (#7615)

See also: #6357

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/7615/merge
XuanYang-cn 2021-09-09 15:36:01 +08:00 committed by GitHub
parent 69794fd32d
commit 64aad49959
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 160 additions and 25 deletions

View File

@ -13,6 +13,7 @@ package datanode
import (
"context"
"errors"
"fmt"
"github.com/milvus-io/milvus/internal/log"
@ -36,6 +37,8 @@ type dataSyncService struct {
collectionID UniqueID
dataCoord types.DataCoord
clearSignal chan<- UniqueID
saveBinlog func(fu *segmentFlushUnit) error
}
func newDataSyncService(ctx context.Context,
@ -49,6 +52,10 @@ func newDataSyncService(ctx context.Context,
) (*dataSyncService, error) {
if replica == nil {
return nil, errors.New("Nil input")
}
ctx1, cancel := context.WithCancel(ctx)
service := &dataSyncService{
@ -147,6 +154,8 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
return nil
}
dsService.saveBinlog = saveBinlog
var dmStreamNode Node = newDmInputNode(
dsService.ctx,
dsService.msFactory,
@ -169,10 +178,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
return err
}
dn, err := newDeleteDNode(dsService.replica)
if err != nil {
return err
}
dn := newDeleteDNode(dsService.replica)
var deleteNode Node = dn

View File

@ -25,6 +25,126 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
func getVchanInfo(cp bool, collID, ufCollID, ufSegID UniqueID, chanName, ufchanName string, ufNor int64) *datapb.VchannelInfo {
var ufs []*datapb.SegmentInfo
if cp {
ufs = []*datapb.SegmentInfo{{
CollectionID: ufCollID,
PartitionID: 1,
InsertChannel: ufchanName,
ID: ufSegID,
NumOfRows: ufNor,
DmlPosition: &internalpb.MsgPosition{},
}}
} else {
ufs = []*datapb.SegmentInfo{}
}
vi := &datapb.VchannelInfo{
CollectionID: collID,
ChannelName: chanName,
SeekPosition: &internalpb.MsgPosition{},
UnflushedSegments: ufs,
FlushedSegments: []int64{},
}
return vi
}
func TestDataSyncService_newDataSyncService(te *testing.T) {
ctx := context.Background()
tests := []struct {
isValidCase bool
replicaNil bool
inMsgFactory msgstream.Factory
collID UniqueID
ufCollID UniqueID
ufSegID UniqueID
chanName string
ufchanName string
ufNor int64
description string
}{
{false, false, &mockMsgStreamFactory{false, true},
0, 0, 0, "", "", 0,
"SetParamsReturnError"},
{true, false, &mockMsgStreamFactory{true, true},
0, 1, 0, "", "", 0,
"CollID 0 mismach with seginfo collID 1"},
{true, false, &mockMsgStreamFactory{true, true},
1, 1, 0, "c1", "c2", 0,
"chanName c1 mismach with seginfo chanName c2"},
{true, false, &mockMsgStreamFactory{true, true},
1, 1, 0, "c1", "c1", 0,
"add normal segments"},
{false, false, &mockMsgStreamFactory{true, false},
0, 0, 0, "", "", 0,
"error when newinsertbufernode"},
{false, true, &mockMsgStreamFactory{true, false},
0, 0, 0, "", "", 0,
"replica nil"},
}
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
df := &DataCoordFactory{}
replica := newReplica(&RootCoordFactory{}, test.collID)
if test.replicaNil {
replica = nil
}
ds, err := newDataSyncService(ctx,
make(chan *flushMsg),
replica,
NewAllocatorFactory(),
test.inMsgFactory,
getVchanInfo(test.isValidCase, test.collID, test.ufCollID, test.ufSegID, test.chanName, test.ufchanName, test.ufNor),
make(chan UniqueID),
df,
)
if !test.isValidCase {
assert.Error(t, err)
assert.Nil(t, ds)
} else {
assert.NoError(t, err)
assert.NotNil(t, ds)
// save binlog
fu := &segmentFlushUnit{
collID: 1,
segID: 100,
field2Path: map[UniqueID]string{100: "path1"},
checkPoint: map[UniqueID]segmentCheckPoint{100: {100, internalpb.MsgPosition{}}},
}
df.SaveBinlogPathError = true
err := ds.saveBinlog(fu)
assert.Error(t, err)
df.SaveBinlogPathError = false
df.SaveBinlogPathNotSucess = true
err = ds.saveBinlog(fu)
assert.Error(t, err)
df.SaveBinlogPathError = false
df.SaveBinlogPathNotSucess = false
err = ds.saveBinlog(fu)
assert.NoError(t, err)
// start
ds.fg = nil
ds.start()
}
})
}
}
// NOTE: start pulsar before test
func TestDataSyncService_Start(t *testing.T) {
t.Skip()

View File

@ -72,16 +72,12 @@ func getSegmentsByPKs(pks []int64, segments []*Segment) (map[int64][]int64, erro
return results, nil
}
func newDeleteDNode(replica Replica) (*deleteNode, error) {
func newDeleteDNode(replica Replica) *deleteNode {
baseNode := BaseNode{}
baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength)
if replica == nil {
return nil, errors.New("Nill input replica")
}
return &deleteNode{
BaseNode: baseNode,
replica: replica,
}, nil
}
}

View File

@ -23,27 +23,18 @@ func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) {
tests := []struct {
replica Replica
expectedErr bool
description string
}{
{nil, true, "Nil input"},
{&SegmentReplica{}, false, "pointer of SegmentReplica"},
{&SegmentReplica{}, "pointer of SegmentReplica"},
}
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
dn, err := newDeleteDNode(test.replica)
dn := newDeleteDNode(test.replica)
if test.expectedErr {
assert.Error(t, err)
assert.Nil(t, dn)
} else {
assert.NoError(t, err)
assert.NotNil(t, dn)
assert.Equal(t, "deleteNode", dn.Name())
dn.Close()
}
assert.NotNil(t, dn)
assert.Equal(t, "deleteNode", dn.Name())
dn.Close()
})
}

View File

@ -13,6 +13,7 @@ package datanode
import (
"context"
"errors"
"testing"
"github.com/milvus-io/milvus/internal/msgstream"
@ -20,14 +21,25 @@ import (
)
type mockMsgStreamFactory struct {
SetParamsReturnNil bool
NewMsgStreamNoError bool
}
var _ msgstream.Factory = &mockMsgStreamFactory{}
func (mm *mockMsgStreamFactory) SetParams(params map[string]interface{}) error {
if !mm.SetParamsReturnNil {
return errors.New("Set Params Error")
}
return nil
}
func (mm *mockMsgStreamFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
return nil, nil
if !mm.NewMsgStreamNoError {
return nil, errors.New("New MsgStream error")
}
return &mockTtMsgStream{}, nil
}
func (mm *mockMsgStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) {

View File

@ -161,9 +161,19 @@ type RootCoordFactory struct {
type DataCoordFactory struct {
types.DataCoord
SaveBinlogPathError bool
SaveBinlogPathNotSucess bool
}
func (ds *DataCoordFactory) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
if ds.SaveBinlogPathError {
return nil, errors.New("Error")
}
if ds.SaveBinlogPathNotSucess {
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil
}
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}