Add unit test for masterservice

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
pull/4973/head^2
neza2017 2021-04-08 17:31:39 +08:00 committed by yefu.chen
parent 57831b9978
commit 8dd3e1e28f
5 changed files with 282 additions and 22 deletions

View File

@ -56,9 +56,12 @@ type Core struct {
MetaTable *metaTable
//id allocator
idAllocator *allocator.GlobalIDAllocator
idAllocator func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error)
idAllocatorUpdate func() error
//tso allocator
tsoAllocator *tso.GlobalTSOAllocator
tsoAllocator func(count uint32) (typeutil.Timestamp, error)
tsoAllocatorUpdate func() error
//inner members
ctx context.Context
@ -151,9 +154,15 @@ func (c *Core) checkInit() error {
if c.idAllocator == nil {
return fmt.Errorf("idAllocator is nil")
}
if c.idAllocatorUpdate == nil {
return fmt.Errorf("idAllocatorUpdate is nil")
}
if c.tsoAllocator == nil {
return fmt.Errorf("tsoAllocator is nil")
}
if c.tsoAllocatorUpdate == nil {
return fmt.Errorf("tsoAllocatorUpdate is nil")
}
if c.etcdCli == nil {
return fmt.Errorf("etcdCli is nil")
}
@ -208,10 +217,6 @@ func (c *Core) checkInit() error {
if c.ReleaseCollection == nil {
return fmt.Errorf("ReleaseCollection is nil")
}
log.Debug("master", zap.Int64("node id", int64(Params.NodeID)))
log.Debug("master", zap.String("dd channel name", Params.DdChannel))
log.Debug("master", zap.String("time tick channel name", Params.TimeTickChannel))
return nil
}
@ -365,11 +370,11 @@ func (c *Core) tsLoop() {
for {
select {
case <-tsoTicker.C:
if err := c.tsoAllocator.UpdateTSO(); err != nil {
if err := c.tsoAllocatorUpdate(); err != nil {
log.Warn("failed to update timestamp: ", zap.Error(err))
continue
}
if err := c.idAllocator.UpdateID(); err != nil {
if err := c.idAllocatorUpdate(); err != nil {
log.Warn("failed to update id: ", zap.Error(err))
continue
}
@ -636,7 +641,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
log.Debug("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel))
c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
ts, err := c.tsoAllocator.Alloc(1)
ts, err := c.tsoAllocator(1)
if err != nil {
return nil, err
}
@ -664,7 +669,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
}
c.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) {
ts, err := c.tsoAllocator.Alloc(1)
ts, err := c.tsoAllocator(1)
if err != nil {
return 0, err
}
@ -773,14 +778,28 @@ func (c *Core) Init() error {
return
}
c.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "gid"))
if initError = c.idAllocator.Initialize(); initError != nil {
idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "gid"))
if initError = idAllocator.Initialize(); initError != nil {
return
}
c.tsoAllocator = tso.NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "tso"))
if initError = c.tsoAllocator.Initialize(); initError != nil {
c.idAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) {
return idAllocator.Alloc(count)
}
c.idAllocatorUpdate = func() error {
return idAllocator.UpdateID()
}
tsoAllocator := tso.NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "tso"))
if initError = tsoAllocator.Initialize(); initError != nil {
return
}
c.tsoAllocator = func(count uint32) (typeutil.Timestamp, error) {
return tsoAllocator.Alloc(count)
}
c.tsoAllocatorUpdate = func() error {
return tsoAllocator.UpdateTSO()
}
c.ddReqQueue = make(chan reqTask, 1024)
c.indexTaskQueue = make(chan *CreateIndexTask, 1024)
initError = c.setMsgStreams()
@ -795,6 +814,11 @@ func (c *Core) Start() error {
if err := c.checkInit(); err != nil {
return err
}
log.Debug("master", zap.Int64("node id", int64(Params.NodeID)))
log.Debug("master", zap.String("dd channel name", Params.DdChannel))
log.Debug("master", zap.String("time tick channel name", Params.TimeTickChannel))
c.startOnce.Do(func() {
go c.startDdScheduler()
go c.startTimeTickLoop()
@ -1433,7 +1457,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques
}
func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error) {
ts, err := c.tsoAllocator.Alloc(in.Count)
ts, err := c.tsoAllocator(in.Count)
if err != nil {
log.Debug("AllocTimestamp failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
return &masterpb.AllocTimestampResponse{
@ -1457,7 +1481,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRe
}
func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) {
start, _, err := c.idAllocator.Alloc(in.Count)
start, _, err := c.idAllocator(in.Count)
if err != nil {
log.Debug("AllocID failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
return &masterpb.AllocIDResponse{

View File

@ -10,6 +10,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
@ -22,6 +23,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
)
type proxyMock struct {
@ -1475,4 +1477,169 @@ func TestMasterService(t *testing.T) {
assert.NotEqual(t, rsp8.Status.ErrorCode, commonpb.ErrorCode_Success)
})
t.Run("alloc_error", func(t *testing.T) {
core.idAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) {
return 0, 0, fmt.Errorf("id allocator error test")
}
core.tsoAllocator = func(count uint32) (typeutil.Timestamp, error) {
return 0, fmt.Errorf("tso allcoator error test")
}
r1 := &masterpb.AllocTimestampRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 5000,
Timestamp: 5000,
SourceID: 5000,
},
Count: 1,
}
p1, err := core.AllocTimestamp(ctx, r1)
assert.Nil(t, err)
assert.NotEqual(t, p1.Status.ErrorCode, commonpb.ErrorCode_Success)
r2 := &masterpb.AllocIDRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined,
MsgID: 3001,
Timestamp: 3001,
SourceID: 3001,
},
Count: 1,
}
p2, err := core.AllocID(ctx, r2)
assert.Nil(t, err)
assert.NotEqual(t, p2.Status.ErrorCode, commonpb.ErrorCode_Success)
})
}
func TestCheckInit(t *testing.T) {
c, err := NewCore(context.Background(), nil)
assert.Nil(t, err)
err = c.Start()
assert.NotNil(t, err)
err = c.checkInit()
assert.NotNil(t, err)
c.MetaTable = &metaTable{}
err = c.checkInit()
assert.NotNil(t, err)
c.idAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) {
return 0, 0, nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.idAllocatorUpdate = func() error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.tsoAllocator = func(count uint32) (typeutil.Timestamp, error) {
return 0, nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.tsoAllocatorUpdate = func() error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.etcdCli = &clientv3.Client{}
err = c.checkInit()
assert.NotNil(t, err)
c.metaKV = &etcdkv.EtcdKV{}
err = c.checkInit()
assert.NotNil(t, err)
c.kvBase = &etcdkv.EtcdKV{}
err = c.checkInit()
assert.NotNil(t, err)
c.ProxyTimeTickChan = make(chan typeutil.Timestamp)
err = c.checkInit()
assert.NotNil(t, err)
c.ddReqQueue = make(chan reqTask)
err = c.checkInit()
assert.NotNil(t, err)
c.DdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.DdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.DdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.DdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo)
err = c.checkInit()
assert.NotNil(t, err)
c.GetBinlogFilePathsFromDataServiceReq = func(segID, fieldID typeutil.UniqueID) ([]string, error) {
return []string{}, nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) {
return 0, nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.BuildIndexReq = func(ctx context.Context, binlog []string, typeParams, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) {
return 0, nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.DropIndexReq = func(ctx context.Context, indexID typeutil.UniqueID) error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.InvalidateCollectionMetaCache = func(ctx context.Context, ts typeutil.Timestamp, dbName, collectionName string) error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.indexTaskQueue = make(chan *CreateIndexTask)
err = c.checkInit()
assert.NotNil(t, err)
c.DataNodeSegmentFlushCompletedChan = make(chan int64)
err = c.checkInit()
assert.NotNil(t, err)
c.ReleaseCollection = func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID) error {
return nil
}
err = c.checkInit()
assert.Nil(t, err)
}

View File

@ -183,6 +183,34 @@ func (mt *metaTable) reloadFromKV() error {
return nil
}
func (mt *metaTable) AddTenant(te *pb.TenantMeta) error {
mt.tenantLock.Lock()
defer mt.tenantLock.Unlock()
k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID)
v := proto.MarshalTextString(te)
if err := mt.client.Save(k, v); err != nil {
return err
}
mt.tenantID2Meta[te.ID] = *te
return nil
}
func (mt *metaTable) AddProxy(po *pb.ProxyMeta) error {
mt.proxyLock.Lock()
defer mt.proxyLock.Unlock()
k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID)
v := proto.MarshalTextString(po)
if err := mt.client.Save(k, v); err != nil {
return err
}
mt.proxyID2Meta[po.ID] = *po
return nil
}
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()

View File

@ -97,6 +97,19 @@ func TestMetaTable(t *testing.T) {
}
t.Run("add collection", func(t *testing.T) {
partInfo.SegmentIDs = []int64{100}
err = mt.AddCollection(collInfo, partInfo, idxInfo)
assert.NotNil(t, err)
partInfo.SegmentIDs = []int64{}
collInfo.PartitionIDs = []int64{100}
err = mt.AddCollection(collInfo, partInfo, idxInfo)
assert.NotNil(t, err)
collInfo.PartitionIDs = []int64{}
err = mt.AddCollection(collInfo, partInfo, nil)
assert.NotNil(t, err)
err = mt.AddCollection(collInfo, partInfo, idxInfo)
assert.Nil(t, err)
@ -104,6 +117,11 @@ func TestMetaTable(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, collMeta.PartitionIDs[0], int64(10))
assert.Equal(t, len(collMeta.PartitionIDs), 1)
assert.True(t, mt.HasCollection(collInfo.ID))
field, err := mt.GetFieldSchema("testColl", "field110")
assert.Nil(t, err)
assert.Equal(t, field.FieldID, collInfo.Schema.Fields[0].FieldID)
})
t.Run("add segment", func(t *testing.T) {
@ -215,6 +233,22 @@ func TestMetaTable(t *testing.T) {
assert.Zero(t, len(idx))
})
t.Run("reload meta", func(t *testing.T) {
te := pb.TenantMeta{
ID: 100,
}
err := mt.AddTenant(&te)
assert.Nil(t, err)
po := pb.ProxyMeta{
ID: 101,
}
err = mt.AddProxy(&po)
assert.Nil(t, err)
_, err = NewMetaTable(ekv)
assert.Nil(t, err)
})
t.Run("drop index", func(t *testing.T) {
idx, ok, err := mt.DropIndex("testColl", "field110", "field110")
assert.Nil(t, err)
@ -239,4 +273,11 @@ func TestMetaTable(t *testing.T) {
})
t.Run("drop collection", func(t *testing.T) {
err := mt.DeleteCollection(2)
assert.NotNil(t, err)
err = mt.DeleteCollection(1)
assert.Nil(t, err)
})
}

View File

@ -38,9 +38,9 @@ func (bt *baseReqTask) Notify(err error) {
func (bt *baseReqTask) WaitToFinish() error {
select {
case <-bt.core.ctx.Done():
return fmt.Errorf("core context done, %s", bt.core.ctx.Err().Error())
return fmt.Errorf("core context done, %w", bt.core.ctx.Err())
case <-bt.ctx.Done():
return fmt.Errorf("request context done, %s", bt.ctx.Err().Error())
return fmt.Errorf("request context done, %w", bt.ctx.Err())
case err, ok := <-bt.cv:
if !ok {
return fmt.Errorf("notify chan closed")
@ -103,7 +103,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
}
schema.Fields = append(schema.Fields, rowIDField, timeStampField)
collID, err := t.core.idAllocator.AllocOne()
collID, _, err := t.core.idAllocator(1)
if err != nil {
return err
}
@ -111,7 +111,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
partitionID, err := t.core.idAllocator.AllocOne()
partitionID, _, err := t.core.idAllocator(1)
if err != nil {
return err
}
@ -412,7 +412,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
partitionID, err := t.core.idAllocator.AllocOne()
partitionID, _, err := t.core.idAllocator(1)
if err != nil {
return err
}
@ -711,7 +711,7 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
return fmt.Errorf("create index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
indexName := Params.DefaultIndexName //TODO, get name from request
indexID, err := t.core.idAllocator.AllocOne()
indexID, _, err := t.core.idAllocator(1)
if err != nil {
return err
}