From e2b45f9866da8f7d1bd487dace520056d5721f39 Mon Sep 17 00:00:00 2001 From: neza2017 Date: Sat, 23 Jan 2021 17:56:57 +0800 Subject: [PATCH] Init param table and open unittest at ci Signed-off-by: neza2017 --- configs/advanced/channel.yaml | 2 + configs/advanced/common.yaml | 4 +- configs/advanced/master.yaml | 3 +- .../masterservice/masterservice_test.go | 22 ++- internal/masterservice/master_service.go | 20 ++- internal/masterservice/meta_table_test.go | 3 +- internal/masterservice/param_table.go | 139 +++++++++++++++++- internal/masterservice/param_table_test.go | 53 +++++++ scripts/run_go_unittest.sh | 1 + 9 files changed, 226 insertions(+), 21 deletions(-) create mode 100644 internal/masterservice/param_table_test.go diff --git a/configs/advanced/channel.yaml b/configs/advanced/channel.yaml index adedbee98e..a11c7fccb0 100644 --- a/configs/advanced/channel.yaml +++ b/configs/advanced/channel.yaml @@ -13,6 +13,8 @@ msgChannel: # channel name generation rule: ${namePrefix}-${ChannelIdx} chanNamePrefix: dataDefinition: "data-definition" + masterTimeTick: "master-timetick" + masterStatistics: "master-statistics" insert: "insert" delete: "delete" search: "search" diff --git a/configs/advanced/common.yaml b/configs/advanced/common.yaml index 367aa3dc65..f1f5a25ffe 100644 --- a/configs/advanced/common.yaml +++ b/configs/advanced/common.yaml @@ -10,4 +10,6 @@ # or implied. See the License for the specific language governing permissions and limitations under the License. common: - defaultPartitionTag: _default + defaultPartitionTag: _default #TODO, remove + defaultPartitionName: "_default" + defaultIndexName: "_default_idx" diff --git a/configs/advanced/master.yaml b/configs/advanced/master.yaml index b6386554bb..69474322ae 100644 --- a/configs/advanced/master.yaml +++ b/configs/advanced/master.yaml @@ -22,4 +22,5 @@ master: # old name: segmentExpireDuration: 2000 IDAssignExpiration: 2000 # ms - maxPartitionNum: 4096 \ No newline at end of file + maxPartitionNum: 4096 + nodeID: 100 \ No newline at end of file diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 3d3219a65e..933447d2da 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math/rand" "regexp" + "sync" "testing" "time" @@ -22,12 +23,15 @@ func TestGrpcService(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() - cms.Params.Address = "127.0.0.1" + //cms.Params.Address = "127.0.0.1" cms.Params.Port = (randVal % 100) + 10000 - cms.Params.NodeID = 0 - cms.Params.PulsarAddress = "pulsar://127.0.0.1:6650" - cms.Params.EtcdAddress = "127.0.0.1:2379" + svr, err := NewGrpcServer() + assert.Nil(t, err) + + // cms.Params.NodeID = 0 + //cms.Params.PulsarAddress = "pulsar://127.0.0.1:6650" + //cms.Params.EtcdAddress = "127.0.0.1:2379" cms.Params.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal) cms.Params.KvRootPath = fmt.Sprintf("/%d/test/kv", randVal) cms.Params.ProxyTimeTickChannel = fmt.Sprintf("proxyTimeTick%d", randVal) @@ -43,9 +47,6 @@ func TestGrpcService(t *testing.T) { t.Logf("master service port = %d", cms.Params.Port) - svr, err := NewGrpcServer() - assert.Nil(t, err) - core := svr.core.(*cms.Core) err = svr.Init(&cms.InitParams{ProxyTimeTickChannel: fmt.Sprintf("proxyTimeTick%d", randVal)}) @@ -94,8 +95,11 @@ func TestGrpcService(t *testing.T) { return []string{"file1", "file2", "file3"}, nil } + var binlogLock sync.Mutex binlogPathArray := make([]string, 0, 16) core.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error) { + binlogLock.Lock() + defer binlogLock.Unlock() binlogPathArray = append(binlogPathArray, binlog...) return 2000, nil } @@ -109,7 +113,7 @@ func TestGrpcService(t *testing.T) { err = svr.Start() assert.Nil(t, err) - cli, err := NewGrpcClient(fmt.Sprintf("127.0.0.1:%d", cms.Params.Port), 3*time.Second) + cli, err := NewGrpcClient(fmt.Sprintf("%s:%d", cms.Params.Address, cms.Params.Port), 3*time.Second) assert.Nil(t, err) err = cli.Init(&cms.InitParams{ProxyTimeTickChannel: fmt.Sprintf("proxyTimeTick%d", randVal)}) @@ -403,6 +407,8 @@ func TestGrpcService(t *testing.T) { rsp, err := cli.CreateIndex(req) assert.Nil(t, err) assert.Equal(t, rsp.ErrorCode, commonpb.ErrorCode_SUCCESS) + binlogLock.Lock() + defer binlogLock.Unlock() assert.Equal(t, 3, len(binlogPathArray)) assert.ElementsMatch(t, binlogPathArray, []string{"file1", "file2", "file3"}) diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 33b5a47bb7..ae27748381 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -96,14 +96,17 @@ type Interface interface { // master core type Core struct { - //TODO DataService Interface - //TODO IndexService Interface - //TODO ProxyServiceClient Interface, get proxy service time tick channel,InvalidateCollectionMetaCache + /* + ProxyServiceClient Interface: + get proxy service time tick channel,InvalidateCollectionMetaCache - //TODO Segment States Channel, from DataService, if create new segment, data service should put the segment id into this channel, and let the master add the segment id to the collection meta + DataService Interface: + Segment States Channel, from DataService, if create new segment, data service should put the segment id into this channel, and let the master add the segment id to the collection meta + Segment Flush Watcher, monitor if segment has flushed into disk - //TODO Segment Flush Watcher, monitor if segment has flushed into disk - //TODO indexBuilder Sch, tell index service to build index + IndexService Interface: + indexBuilder Sch, tell index service to build index + */ MetaTable *metaTable //id allocator @@ -485,8 +488,8 @@ func (c *Core) setMsgStreams() error { } // receive time tick from msg stream + c.ProxyTimeTickChan = make(chan typeutil.Timestamp, 1024) go func() { - c.ProxyTimeTickChan = make(chan typeutil.Timestamp, 1024) for { select { case <-c.ctx.Done(): @@ -515,6 +518,7 @@ func (c *Core) setMsgStreams() error { dataServiceStream.CreatePulsarConsumers([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024) dataServiceStream.Start() c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo, 1024) + c.DataNodeSegmentFlushCompletedChan = make(chan typeutil.UniqueID, 1024) // receive segment info from msg stream go func() { @@ -536,7 +540,7 @@ func (c *Core) setMsgStreams() error { if ok { c.DataNodeSegmentFlushCompletedChan <- flushMsg.SegmentFlushCompletedMsg.SegmentID } else { - log.Printf("receiver unexpected msg from data service stream, value = %v", segm) + log.Printf("receive unexpected msg from data service stream, value = %v", segm) } } } diff --git a/internal/masterservice/meta_table_test.go b/internal/masterservice/meta_table_test.go index 3e0e4d8b30..24486a3d0d 100644 --- a/internal/masterservice/meta_table_test.go +++ b/internal/masterservice/meta_table_test.go @@ -18,7 +18,8 @@ import ( func TestMetaTable(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() - etcdAddr := "127.0.0.1:2379" + Params.Init() + etcdAddr := Params.EtcdAddress rootPath := fmt.Sprintf("/test/meta/%d", randVal) etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) diff --git a/internal/masterservice/param_table.go b/internal/masterservice/param_table.go index 7d15e119e8..61dde4ba09 100644 --- a/internal/masterservice/param_table.go +++ b/internal/masterservice/param_table.go @@ -17,14 +17,149 @@ type ParamTable struct { EtcdAddress string MetaRootPath string KvRootPath string - ProxyTimeTickChannel string + ProxyTimeTickChannel string //get from proxy client MsgChannelSubName string TimeTickChannel string DdChannel string StatisticsChannel string - DataServiceSegmentChannel string // data service create segment, or data node flush segment + DataServiceSegmentChannel string // get from data service, data service create segment, or data node flush segment MaxPartitionNum int64 DefaultPartitionName string DefaultIndexName string } + +func (p *ParamTable) Init() { + // load yaml + p.BaseTable.Init() + err := p.LoadYaml("advanced/master.yaml") + if err != nil { + panic(err) + } + + p.initAddress() + p.initPort() + p.initNodeID() + + p.initPulsarAddress() + p.initEtcdAddress() + p.initMetaRootPath() + p.initKvRootPath() + + p.initMsgChannelSubName() + p.initTimeTickChannel() + p.initDdChannelName() + p.initStatisticsChannelName() + + p.initMaxPartitionNum() + p.initDefaultPartitionName() + p.initDefaultIndexName() +} + +func (p *ParamTable) initAddress() { + masterAddress, err := p.Load("master.address") + if err != nil { + panic(err) + } + p.Address = masterAddress +} + +func (p *ParamTable) initPort() { + p.Port = p.ParseInt("master.port") +} + +func (p *ParamTable) initNodeID() { + p.NodeID = uint64(p.ParseInt64("master.nodeID")) +} + +func (p *ParamTable) initPulsarAddress() { + addr, err := p.Load("_PulsarAddress") + if err != nil { + panic(err) + } + p.PulsarAddress = addr +} + +func (p *ParamTable) initEtcdAddress() { + addr, err := p.Load("_EtcdAddress") + if err != nil { + panic(err) + } + p.EtcdAddress = addr +} + +func (p *ParamTable) initMetaRootPath() { + rootPath, err := p.Load("etcd.rootPath") + if err != nil { + panic(err) + } + subPath, err := p.Load("etcd.metaSubPath") + if err != nil { + panic(err) + } + p.MetaRootPath = rootPath + "/" + subPath +} + +func (p *ParamTable) initKvRootPath() { + rootPath, err := p.Load("etcd.rootPath") + if err != nil { + panic(err) + } + subPath, err := p.Load("etcd.kvSubPath") + if err != nil { + panic(err) + } + p.KvRootPath = rootPath + "/" + subPath +} + +func (p *ParamTable) initMsgChannelSubName() { + name, err := p.Load("msgChannel.subNamePrefix.masterSubNamePrefix") + if err != nil { + panic(err) + } + p.MsgChannelSubName = name +} + +func (p *ParamTable) initTimeTickChannel() { + channel, err := p.Load("msgChannel.chanNamePrefix.masterTimeTick") + if err != nil { + panic(err) + } + p.TimeTickChannel = channel +} + +func (p *ParamTable) initDdChannelName() { + channel, err := p.Load("msgChannel.chanNamePrefix.dataDefinition") + if err != nil { + panic(err) + } + p.DdChannel = channel +} + +func (p *ParamTable) initStatisticsChannelName() { + channel, err := p.Load("msgChannel.chanNamePrefix.masterStatistics") + if err != nil { + panic(err) + } + p.StatisticsChannel = channel +} + +func (p *ParamTable) initMaxPartitionNum() { + p.MaxPartitionNum = p.ParseInt64("master.maxPartitionNum") +} + +func (p *ParamTable) initDefaultPartitionName() { + name, err := p.Load("common.defaultPartitionName") + if err != nil { + panic(err) + } + p.DefaultPartitionName = name +} + +func (p *ParamTable) initDefaultIndexName() { + name, err := p.Load("common.defaultIndexName") + if err != nil { + panic(err) + } + p.DefaultIndexName = name +} diff --git a/internal/masterservice/param_table_test.go b/internal/masterservice/param_table_test.go new file mode 100644 index 0000000000..2c2b071e7c --- /dev/null +++ b/internal/masterservice/param_table_test.go @@ -0,0 +1,53 @@ +package masterservice + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + assert.NotEqual(t, Params.Address, "") + t.Logf("master address = %s", Params.Address) + + assert.NotEqual(t, Params.Port, 0) + t.Logf("master port = %d", Params.Port) + + assert.NotEqual(t, Params.NodeID, 0) + t.Logf("master node ID = %d", Params.NodeID) + + assert.NotEqual(t, Params.PulsarAddress, "") + t.Logf("pulsar address = %s", Params.PulsarAddress) + + assert.NotEqual(t, Params.EtcdAddress, "") + t.Logf("etcd address = %s", Params.EtcdAddress) + + assert.NotEqual(t, Params.MetaRootPath, "") + t.Logf("meta root path = %s", Params.MetaRootPath) + + assert.NotEqual(t, Params.KvRootPath, "") + t.Logf("kv root path = %s", Params.KvRootPath) + + assert.NotEqual(t, Params.MsgChannelSubName, "") + t.Logf("msg channel sub name = %s", Params.MsgChannelSubName) + + assert.NotEqual(t, Params.TimeTickChannel, "") + t.Logf("master time tick channel = %s", Params.TimeTickChannel) + + assert.NotEqual(t, Params.DdChannel, "") + t.Logf("master dd channel = %s", Params.DdChannel) + + assert.NotEqual(t, Params.StatisticsChannel, "") + t.Logf("master statistics channel = %s", Params.StatisticsChannel) + + assert.NotEqual(t, Params.MaxPartitionNum, 0) + t.Logf("master initMaxPartitionNum = %d", Params.MaxPartitionNum) + + assert.NotEqual(t, Params.DefaultPartitionName, "") + t.Logf("default partition name = %s", Params.DefaultPartitionName) + + assert.NotEqual(t, Params.DefaultIndexName, "") + t.Logf("default index name = %s", Params.DefaultIndexName) +} diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index 48438fa2a7..fc37534e36 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -20,4 +20,5 @@ go test -race -cover "${MILVUS_DIR}/writenode/..." -failfast go test -race -cover "${MILVUS_DIR}/master/..." -failfast go test -race -cover "${MILVUS_DIR}/indexnode/..." -failfast go test -race -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast +go test -race -cover -v "${MILVUS_DIR}/masterservice" "${MILVUS_DIR}/distributed/masterservice" -failfast #go test -race -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." -failfast