Add ut for binlog_io to 100 coverage (#12283)

Make DN ut coverage upto 90%
Resolves: #8058

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/12308/head
XuanYang-cn 2021-11-26 17:43:17 +08:00 committed by GitHub
parent 0ed12dab14
commit 48b45d82e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 235 additions and 40 deletions

View File

@ -22,6 +22,7 @@ import (
"errors"
"path"
"strconv"
"time"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
@ -31,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var (
@ -61,38 +63,38 @@ var _ downloader = (*binlogIO)(nil)
var _ uploader = (*binlogIO)(nil)
func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error) {
var err error = errStart
var (
err = errStart
vs = []string{}
)
r := make(chan []string)
go func(r chan<- []string) {
var vs []string
g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
for err != nil {
select {
case <-ctx.Done():
close(r)
case <-gCtx.Done():
log.Warn("ctx done when downloading kvs from blob storage")
return
return errDownloadFromBlobStorage
default:
if err != errStart {
<-time.After(50 * time.Millisecond)
log.Warn("Try multiloading again", zap.Strings("paths", paths))
}
vs, err = b.MultiLoad(paths)
}
}
r <- vs
}(r)
return nil
})
vs, ok := <-r
if !ok {
return nil, errDownloadFromBlobStorage
if err := g.Wait(); err != nil {
return nil, err
}
rst := make([]*Blob, 0, len(vs))
for _, vstr := range vs {
b := bytes.NewBufferString(vstr)
rst = append(rst, &Blob{Value: b.Bytes()})
rst := make([]*Blob, len(vs))
for i := range rst {
rst[i] = &Blob{Value: bytes.NewBufferString(vs[i]).Bytes()}
}
return rst, nil
@ -148,27 +150,27 @@ func (b *binlogIO) upload(
p.deltaInfo.DeltaLogPath = k
}
success := make(chan struct{})
go func(success chan<- struct{}) {
err := errStart
var err = errStart
g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
for err != nil {
select {
case <-ctx.Done():
close(success)
case <-gCtx.Done():
log.Warn("ctx done when saving kvs to blob storage")
return
return errUploadToBlobStorage
default:
if err != errStart {
<-time.After(50 * time.Millisecond)
log.Info("retry save binlogs")
}
err = b.MultiSave(kvs)
}
}
success <- struct{}{}
}(success)
return nil
})
if _, ok := <-success; !ok {
return nil, errUploadToBlobStorage
if err := g.Wait(); err != nil {
return nil, err
}
return p, nil
@ -214,11 +216,8 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta
}
for _, blob := range inlogs {
fID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
if err != nil {
log.Error("can not parse string to fieldID", zap.Error(err))
return nil, nil, nil, err
}
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
k := JoinIDPath(meta.GetID(), partID, segID, fID, <-generator)
key := path.Join(Params.InsertBinlogRootPath, k)
@ -230,11 +229,8 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta
}
for _, blob := range statslogs {
fID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
if err != nil {
log.Error("can not parse string to fieldID", zap.Error(err))
return nil, nil, nil, err
}
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
k := JoinIDPath(meta.GetID(), partID, segID, fID, <-generator)
key := path.Join(Params.StatsBinlogRootPath, k)

View File

@ -18,8 +18,10 @@ package datanode
import (
"context"
"errors"
"path"
"testing"
"time"
"github.com/milvus-io/milvus/internal/kv"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
@ -59,6 +61,44 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
assert.Nil(t, p)
})
t.Run("Test upload error", func(t *testing.T) {
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "uploads")
dData := &DeleteData{
Pks: []int64{},
Tss: []uint64{},
}
iData := genEmptyInsertData()
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
assert.Error(t, err)
assert.Empty(t, p)
iData = genInsertData()
dData = &DeleteData{
Pks: []int64{},
Tss: []uint64{1},
RowCount: 1,
}
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
assert.Error(t, err)
assert.Empty(t, p)
mkv := &mockKv{errMultiSave: true}
bin := &binlogIO{mkv, alloc}
iData = genInsertData()
dData = &DeleteData{
Pks: []int64{1},
Tss: []uint64{1},
RowCount: 1,
}
ctx, cancel := context.WithTimeout(context.TODO(), 20*time.Millisecond)
p, err = bin.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta)
assert.Error(t, err)
assert.Empty(t, p)
cancel()
})
t.Run("Test download", func(t *testing.T) {
tests := []struct {
isvalid bool
@ -99,7 +139,17 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
}
})
}
})
t.Run("Test download twice", func(t *testing.T) {
mkv := &mockKv{errMultiLoad: true}
b := &binlogIO{mkv, alloc}
ctx, cancel := context.WithTimeout(context.TODO(), time.Millisecond*20)
blobs, err := b.download(ctx, []string{"a"})
assert.Error(t, err)
assert.Empty(t, blobs)
cancel()
})
}
@ -152,7 +202,22 @@ func TestBinlogIOInnerMethods(t *testing.T) {
}
})
}
})
t.Run("Test genDeltaBlobs error", func(t *testing.T) {
k, v, err := b.genDeltaBlobs(&DeleteData{Pks: []int64{1}, Tss: []uint64{}}, 1, 1, 1)
assert.Error(t, err)
assert.Empty(t, k)
assert.Empty(t, v)
errAlloc := NewAllocatorFactory()
errAlloc.isvalid = false
bin := binlogIO{memkv.NewMemoryKV(), errAlloc}
k, v, err = bin.genDeltaBlobs(&DeleteData{Pks: []int64{1}, Tss: []uint64{1}}, 1, 1, 1)
assert.Error(t, err)
assert.Empty(t, k)
assert.Empty(t, v)
})
t.Run("Test genInsertBlobs", func(t *testing.T) {
@ -172,6 +237,33 @@ func TestBinlogIOInnerMethods(t *testing.T) {
zap.String("stats paths field0", pstats[0].GetBinlogs()[0]))
})
t.Run("Test genInsertBlobs error", func(t *testing.T) {
kvs, pin, pstats, err := b.genInsertBlobs(&InsertData{}, 1, 1, nil)
assert.Error(t, err)
assert.Empty(t, kvs)
assert.Empty(t, pin)
assert.Empty(t, pstats)
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs")
kvs, pin, pstats, err = b.genInsertBlobs(genEmptyInsertData(), 10, 1, meta)
assert.Error(t, err)
assert.Empty(t, kvs)
assert.Empty(t, pin)
assert.Empty(t, pstats)
errAlloc := NewAllocatorFactory()
errAlloc.errAllocBatch = true
bin := &binlogIO{memkv.NewMemoryKV(), errAlloc}
kvs, pin, pstats, err = bin.genInsertBlobs(genInsertData(), 10, 1, meta)
assert.Error(t, err)
assert.Empty(t, kvs)
assert.Empty(t, pin)
assert.Empty(t, pstats)
})
t.Run("Test idxGenerator", func(t *testing.T) {
tests := []struct {
isvalid bool
@ -224,3 +316,31 @@ func TestBinlogIOInnerMethods(t *testing.T) {
})
}
type mockKv struct {
errMultiLoad bool
errMultiSave bool
}
var _ kv.BaseKV = (*mockKv)(nil)
func (mk *mockKv) Load(key string) (string, error) { return "", nil }
func (mk *mockKv) MultiLoad(keys []string) ([]string, error) {
if mk.errMultiLoad {
return []string{}, errors.New("mockKv multiload error")
}
return []string{"a"}, nil
}
func (mk *mockKv) LoadWithPrefix(key string) ([]string, []string, error) { return nil, nil, nil }
func (mk *mockKv) Save(key, value string) error { return nil }
func (mk *mockKv) MultiSave(kvs map[string]string) error {
if mk.errMultiSave {
return errors.New("mockKv multisave error")
}
return nil
}
func (mk *mockKv) Remove(key string) error { return nil }
func (mk *mockKv) MultiRemove(keys []string) error { return nil }
func (mk *mockKv) RemoveWithPrefix(key string) error { return nil }
func (mk *mockKv) Close() {}

View File

@ -533,9 +533,10 @@ func genFlowGraphDeleteMsg(pks []int64, chanName string) flowGraphMsg {
type AllocatorFactory struct {
sync.Mutex
r *rand.Rand
isvalid bool
random bool
r *rand.Rand
isvalid bool
random bool
errAllocBatch bool
}
var _ allocatorInterface = &AllocatorFactory{}
@ -564,7 +565,7 @@ func (alloc *AllocatorFactory) allocID() (UniqueID, error) {
}
func (alloc *AllocatorFactory) allocIDBatch(count uint32) (UniqueID, uint32, error) {
if count == 0 {
if count == 0 || alloc.errAllocBatch {
return 0, 0, errors.New("count should be greater than zero")
}
@ -749,3 +750,55 @@ func genInsertData() *InsertData {
},
}}
}
func genEmptyInsertData() *InsertData {
return &InsertData{
Data: map[int64]s.FieldData{
0: &s.Int64FieldData{
NumRows: []int64{0},
Data: []int64{},
},
1: &s.Int64FieldData{
NumRows: []int64{0},
Data: []int64{},
},
100: &s.FloatVectorFieldData{
NumRows: []int64{0},
Data: []float32{},
Dim: 2,
},
101: &s.BinaryVectorFieldData{
NumRows: []int64{0},
Data: []byte{},
Dim: 32,
},
102: &s.BoolFieldData{
NumRows: []int64{0},
Data: []bool{},
},
103: &s.Int8FieldData{
NumRows: []int64{0},
Data: []int8{},
},
104: &s.Int16FieldData{
NumRows: []int64{0},
Data: []int16{},
},
105: &s.Int32FieldData{
NumRows: []int64{0},
Data: []int32{},
},
106: &s.Int64FieldData{
NumRows: []int64{0},
Data: []int64{},
},
107: &s.FloatFieldData{
NumRows: []int64{0},
Data: []float32{},
},
108: &s.DoubleFieldData{
NumRows: []int64{0},
Data: []float64{},
},
}}
}

View File

@ -263,6 +263,10 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if !ok {
return nil, nil, fmt.Errorf("data doesn't contains timestamp field")
}
if timeFieldData.RowNum() <= 0 {
return nil, nil, fmt.Errorf("There's no data in InsertData")
}
ts := timeFieldData.(*Int64FieldData).Data
startTs := ts[0]
endTs := ts[len(ts)-1]

View File

@ -251,6 +251,28 @@ func TestInsertCodec(t *testing.T) {
},
},
}
insertDataEmpty := &InsertData{
Data: map[int64]FieldData{
RowIDField: &Int64FieldData{[]int64{}, []int64{}},
TimestampField: &Int64FieldData{[]int64{}, []int64{}},
BoolField: &BoolFieldData{[]int64{}, []bool{}},
Int8Field: &Int8FieldData{[]int64{}, []int8{}},
Int16Field: &Int16FieldData{[]int64{}, []int16{}},
Int32Field: &Int32FieldData{[]int64{}, []int32{}},
Int64Field: &Int64FieldData{[]int64{}, []int64{}},
FloatField: &FloatFieldData{[]int64{}, []float32{}},
DoubleField: &DoubleFieldData{[]int64{}, []float64{}},
StringField: &StringFieldData{[]int64{}, []string{}},
BinaryVectorField: &BinaryVectorFieldData{[]int64{}, []byte{}, 8},
FloatVectorField: &FloatVectorFieldData{[]int64{}, []float32{}, 4},
},
}
b, s, err := insertCodec.Serialize(PartitionID, SegmentID, insertDataEmpty)
assert.Error(t, err)
assert.Empty(t, b)
assert.Empty(t, s)
Blobs1, statsBlob1, err := insertCodec.Serialize(PartitionID, SegmentID, insertData1)
assert.Nil(t, err)
for _, blob := range Blobs1 {