From 8dd3e1e28ffc8cf311909109aaa71dfd58cb7206 Mon Sep 17 00:00:00 2001 From: neza2017 Date: Thu, 8 Apr 2021 17:31:39 +0800 Subject: [PATCH] Add unit test for masterservice Signed-off-by: neza2017 --- internal/masterservice/master_service.go | 56 ++++-- internal/masterservice/master_service_test.go | 167 ++++++++++++++++++ internal/masterservice/meta_table.go | 28 +++ internal/masterservice/meta_table_test.go | 41 +++++ internal/masterservice/task.go | 12 +- 5 files changed, 282 insertions(+), 22 deletions(-) diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 14a71c81fc..9cb39106e9 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -56,9 +56,12 @@ type Core struct { MetaTable *metaTable //id allocator - idAllocator *allocator.GlobalIDAllocator + idAllocator func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) + idAllocatorUpdate func() error + //tso allocator - tsoAllocator *tso.GlobalTSOAllocator + tsoAllocator func(count uint32) (typeutil.Timestamp, error) + tsoAllocatorUpdate func() error //inner members ctx context.Context @@ -151,9 +154,15 @@ func (c *Core) checkInit() error { if c.idAllocator == nil { return fmt.Errorf("idAllocator is nil") } + if c.idAllocatorUpdate == nil { + return fmt.Errorf("idAllocatorUpdate is nil") + } if c.tsoAllocator == nil { return fmt.Errorf("tsoAllocator is nil") } + if c.tsoAllocatorUpdate == nil { + return fmt.Errorf("tsoAllocatorUpdate is nil") + } if c.etcdCli == nil { return fmt.Errorf("etcdCli is nil") } @@ -208,10 +217,6 @@ func (c *Core) checkInit() error { if c.ReleaseCollection == nil { return fmt.Errorf("ReleaseCollection is nil") } - - log.Debug("master", zap.Int64("node id", int64(Params.NodeID))) - log.Debug("master", zap.String("dd channel name", Params.DdChannel)) - log.Debug("master", zap.String("time tick channel name", Params.TimeTickChannel)) return nil } @@ -365,11 +370,11 @@ func (c *Core) tsLoop() { for { select { case <-tsoTicker.C: - if err := c.tsoAllocator.UpdateTSO(); err != nil { + if err := c.tsoAllocatorUpdate(); err != nil { log.Warn("failed to update timestamp: ", zap.Error(err)) continue } - if err := c.idAllocator.UpdateID(); err != nil { + if err := c.idAllocatorUpdate(); err != nil { log.Warn("failed to update id: ", zap.Error(err)) continue } @@ -636,7 +641,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { log.Debug("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel)) c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) { - ts, err := c.tsoAllocator.Alloc(1) + ts, err := c.tsoAllocator(1) if err != nil { return nil, err } @@ -664,7 +669,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { } c.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) { - ts, err := c.tsoAllocator.Alloc(1) + ts, err := c.tsoAllocator(1) if err != nil { return 0, err } @@ -773,14 +778,28 @@ func (c *Core) Init() error { return } - c.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "gid")) - if initError = c.idAllocator.Initialize(); initError != nil { + idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "gid")) + if initError = idAllocator.Initialize(); initError != nil { return } - c.tsoAllocator = tso.NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "tso")) - if initError = c.tsoAllocator.Initialize(); initError != nil { + c.idAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) { + return idAllocator.Alloc(count) + } + c.idAllocatorUpdate = func() error { + return idAllocator.UpdateID() + } + + tsoAllocator := tso.NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "tso")) + if initError = tsoAllocator.Initialize(); initError != nil { return } + c.tsoAllocator = func(count uint32) (typeutil.Timestamp, error) { + return tsoAllocator.Alloc(count) + } + c.tsoAllocatorUpdate = func() error { + return tsoAllocator.UpdateTSO() + } + c.ddReqQueue = make(chan reqTask, 1024) c.indexTaskQueue = make(chan *CreateIndexTask, 1024) initError = c.setMsgStreams() @@ -795,6 +814,11 @@ func (c *Core) Start() error { if err := c.checkInit(); err != nil { return err } + + log.Debug("master", zap.Int64("node id", int64(Params.NodeID))) + log.Debug("master", zap.String("dd channel name", Params.DdChannel)) + log.Debug("master", zap.String("time tick channel name", Params.TimeTickChannel)) + c.startOnce.Do(func() { go c.startDdScheduler() go c.startTimeTickLoop() @@ -1433,7 +1457,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques } func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error) { - ts, err := c.tsoAllocator.Alloc(in.Count) + ts, err := c.tsoAllocator(in.Count) if err != nil { log.Debug("AllocTimestamp failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &masterpb.AllocTimestampResponse{ @@ -1457,7 +1481,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRe } func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) { - start, _, err := c.idAllocator.Alloc(in.Count) + start, _, err := c.idAllocator(in.Count) if err != nil { log.Debug("AllocID failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &masterpb.AllocIDResponse{ diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 601d18cf75..b40aa1a1d7 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -10,6 +10,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" + etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" @@ -22,6 +23,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/types" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "go.etcd.io/etcd/clientv3" ) type proxyMock struct { @@ -1475,4 +1477,169 @@ func TestMasterService(t *testing.T) { assert.NotEqual(t, rsp8.Status.ErrorCode, commonpb.ErrorCode_Success) }) + + t.Run("alloc_error", func(t *testing.T) { + core.idAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) { + return 0, 0, fmt.Errorf("id allocator error test") + } + core.tsoAllocator = func(count uint32) (typeutil.Timestamp, error) { + return 0, fmt.Errorf("tso allcoator error test") + } + r1 := &masterpb.AllocTimestampRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 5000, + Timestamp: 5000, + SourceID: 5000, + }, + Count: 1, + } + p1, err := core.AllocTimestamp(ctx, r1) + assert.Nil(t, err) + assert.NotEqual(t, p1.Status.ErrorCode, commonpb.ErrorCode_Success) + + r2 := &masterpb.AllocIDRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, + MsgID: 3001, + Timestamp: 3001, + SourceID: 3001, + }, + Count: 1, + } + p2, err := core.AllocID(ctx, r2) + assert.Nil(t, err) + assert.NotEqual(t, p2.Status.ErrorCode, commonpb.ErrorCode_Success) + }) +} + +func TestCheckInit(t *testing.T) { + c, err := NewCore(context.Background(), nil) + assert.Nil(t, err) + + err = c.Start() + assert.NotNil(t, err) + + err = c.checkInit() + assert.NotNil(t, err) + + c.MetaTable = &metaTable{} + err = c.checkInit() + assert.NotNil(t, err) + + c.idAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) { + return 0, 0, nil + } + err = c.checkInit() + assert.NotNil(t, err) + + c.idAllocatorUpdate = func() error { + return nil + } + err = c.checkInit() + assert.NotNil(t, err) + + c.tsoAllocator = func(count uint32) (typeutil.Timestamp, error) { + return 0, nil + } + err = c.checkInit() + assert.NotNil(t, err) + + c.tsoAllocatorUpdate = func() error { + return nil + } + err = c.checkInit() + assert.NotNil(t, err) + + c.etcdCli = &clientv3.Client{} + err = c.checkInit() + assert.NotNil(t, err) + + c.metaKV = &etcdkv.EtcdKV{} + err = c.checkInit() + assert.NotNil(t, err) + + c.kvBase = &etcdkv.EtcdKV{} + err = c.checkInit() + assert.NotNil(t, err) + + c.ProxyTimeTickChan = make(chan typeutil.Timestamp) + err = c.checkInit() + assert.NotNil(t, err) + + c.ddReqQueue = make(chan reqTask) + err = c.checkInit() + assert.NotNil(t, err) + + c.DdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error { + return nil + } + err = c.checkInit() + assert.NotNil(t, err) + + c.DdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error { + return nil + } + err = c.checkInit() + assert.NotNil(t, err) + + c.DdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error { + return nil + } + err = c.checkInit() + assert.NotNil(t, err) + + c.DdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error { + return nil + } + err = c.checkInit() + assert.NotNil(t, err) + + c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo) + err = c.checkInit() + assert.NotNil(t, err) + + c.GetBinlogFilePathsFromDataServiceReq = func(segID, fieldID typeutil.UniqueID) ([]string, error) { + return []string{}, nil + } + err = c.checkInit() + assert.NotNil(t, err) + + c.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) { + return 0, nil + } + err = c.checkInit() + assert.NotNil(t, err) + + c.BuildIndexReq = func(ctx context.Context, binlog []string, typeParams, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) { + return 0, nil + } + err = c.checkInit() + assert.NotNil(t, err) + + c.DropIndexReq = func(ctx context.Context, indexID typeutil.UniqueID) error { + return nil + } + err = c.checkInit() + assert.NotNil(t, err) + + c.InvalidateCollectionMetaCache = func(ctx context.Context, ts typeutil.Timestamp, dbName, collectionName string) error { + return nil + } + err = c.checkInit() + assert.NotNil(t, err) + + c.indexTaskQueue = make(chan *CreateIndexTask) + err = c.checkInit() + assert.NotNil(t, err) + + c.DataNodeSegmentFlushCompletedChan = make(chan int64) + err = c.checkInit() + assert.NotNil(t, err) + + c.ReleaseCollection = func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID) error { + return nil + } + err = c.checkInit() + assert.Nil(t, err) } diff --git a/internal/masterservice/meta_table.go b/internal/masterservice/meta_table.go index bdb1c91d60..9fdf086bb9 100644 --- a/internal/masterservice/meta_table.go +++ b/internal/masterservice/meta_table.go @@ -183,6 +183,34 @@ func (mt *metaTable) reloadFromKV() error { return nil } +func (mt *metaTable) AddTenant(te *pb.TenantMeta) error { + mt.tenantLock.Lock() + defer mt.tenantLock.Unlock() + + k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID) + v := proto.MarshalTextString(te) + + if err := mt.client.Save(k, v); err != nil { + return err + } + mt.tenantID2Meta[te.ID] = *te + return nil +} + +func (mt *metaTable) AddProxy(po *pb.ProxyMeta) error { + mt.proxyLock.Lock() + defer mt.proxyLock.Unlock() + + k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID) + v := proto.MarshalTextString(po) + + if err := mt.client.Save(k, v); err != nil { + return err + } + mt.proxyID2Meta[po.ID] = *po + return nil +} + func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() diff --git a/internal/masterservice/meta_table_test.go b/internal/masterservice/meta_table_test.go index 271b02f3cf..ea1be17a1d 100644 --- a/internal/masterservice/meta_table_test.go +++ b/internal/masterservice/meta_table_test.go @@ -97,6 +97,19 @@ func TestMetaTable(t *testing.T) { } t.Run("add collection", func(t *testing.T) { + partInfo.SegmentIDs = []int64{100} + err = mt.AddCollection(collInfo, partInfo, idxInfo) + assert.NotNil(t, err) + partInfo.SegmentIDs = []int64{} + + collInfo.PartitionIDs = []int64{100} + err = mt.AddCollection(collInfo, partInfo, idxInfo) + assert.NotNil(t, err) + collInfo.PartitionIDs = []int64{} + + err = mt.AddCollection(collInfo, partInfo, nil) + assert.NotNil(t, err) + err = mt.AddCollection(collInfo, partInfo, idxInfo) assert.Nil(t, err) @@ -104,6 +117,11 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) assert.Equal(t, collMeta.PartitionIDs[0], int64(10)) assert.Equal(t, len(collMeta.PartitionIDs), 1) + assert.True(t, mt.HasCollection(collInfo.ID)) + + field, err := mt.GetFieldSchema("testColl", "field110") + assert.Nil(t, err) + assert.Equal(t, field.FieldID, collInfo.Schema.Fields[0].FieldID) }) t.Run("add segment", func(t *testing.T) { @@ -215,6 +233,22 @@ func TestMetaTable(t *testing.T) { assert.Zero(t, len(idx)) }) + t.Run("reload meta", func(t *testing.T) { + te := pb.TenantMeta{ + ID: 100, + } + err := mt.AddTenant(&te) + assert.Nil(t, err) + po := pb.ProxyMeta{ + ID: 101, + } + err = mt.AddProxy(&po) + assert.Nil(t, err) + + _, err = NewMetaTable(ekv) + assert.Nil(t, err) + }) + t.Run("drop index", func(t *testing.T) { idx, ok, err := mt.DropIndex("testColl", "field110", "field110") assert.Nil(t, err) @@ -239,4 +273,11 @@ func TestMetaTable(t *testing.T) { }) + t.Run("drop collection", func(t *testing.T) { + err := mt.DeleteCollection(2) + assert.NotNil(t, err) + err = mt.DeleteCollection(1) + assert.Nil(t, err) + }) + } diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index e11d818c85..029778ab74 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -38,9 +38,9 @@ func (bt *baseReqTask) Notify(err error) { func (bt *baseReqTask) WaitToFinish() error { select { case <-bt.core.ctx.Done(): - return fmt.Errorf("core context done, %s", bt.core.ctx.Err().Error()) + return fmt.Errorf("core context done, %w", bt.core.ctx.Err()) case <-bt.ctx.Done(): - return fmt.Errorf("request context done, %s", bt.ctx.Err().Error()) + return fmt.Errorf("request context done, %w", bt.ctx.Err()) case err, ok := <-bt.cv: if !ok { return fmt.Errorf("notify chan closed") @@ -103,7 +103,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { } schema.Fields = append(schema.Fields, rowIDField, timeStampField) - collID, err := t.core.idAllocator.AllocOne() + collID, _, err := t.core.idAllocator(1) if err != nil { return err } @@ -111,7 +111,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { if err != nil { return err } - partitionID, err := t.core.idAllocator.AllocOne() + partitionID, _, err := t.core.idAllocator(1) if err != nil { return err } @@ -412,7 +412,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { if err != nil { return err } - partitionID, err := t.core.idAllocator.AllocOne() + partitionID, _, err := t.core.idAllocator(1) if err != nil { return err } @@ -711,7 +711,7 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error { return fmt.Errorf("create index, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) } indexName := Params.DefaultIndexName //TODO, get name from request - indexID, err := t.core.idAllocator.AllocOne() + indexID, _, err := t.core.idAllocator(1) if err != nil { return err }