Init param table and open unittest at ci

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
pull/4973/head^2
neza2017 2021-01-23 17:56:57 +08:00 committed by yefu.chen
parent c23e2de435
commit e2b45f9866
9 changed files with 226 additions and 21 deletions

View File

@ -13,6 +13,8 @@ msgChannel:
# channel name generation rule: ${namePrefix}-${ChannelIdx}
chanNamePrefix:
dataDefinition: "data-definition"
masterTimeTick: "master-timetick"
masterStatistics: "master-statistics"
insert: "insert"
delete: "delete"
search: "search"

View File

@ -10,4 +10,6 @@
# or implied. See the License for the specific language governing permissions and limitations under the License.
common:
defaultPartitionTag: _default
defaultPartitionTag: _default #TODO, remove
defaultPartitionName: "_default"
defaultIndexName: "_default_idx"

View File

@ -22,4 +22,5 @@ master:
# old name: segmentExpireDuration: 2000
IDAssignExpiration: 2000 # ms
maxPartitionNum: 4096
maxPartitionNum: 4096
nodeID: 100

View File

@ -4,6 +4,7 @@ import (
"fmt"
"math/rand"
"regexp"
"sync"
"testing"
"time"
@ -22,12 +23,15 @@ func TestGrpcService(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
cms.Params.Address = "127.0.0.1"
//cms.Params.Address = "127.0.0.1"
cms.Params.Port = (randVal % 100) + 10000
cms.Params.NodeID = 0
cms.Params.PulsarAddress = "pulsar://127.0.0.1:6650"
cms.Params.EtcdAddress = "127.0.0.1:2379"
svr, err := NewGrpcServer()
assert.Nil(t, err)
// cms.Params.NodeID = 0
//cms.Params.PulsarAddress = "pulsar://127.0.0.1:6650"
//cms.Params.EtcdAddress = "127.0.0.1:2379"
cms.Params.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal)
cms.Params.KvRootPath = fmt.Sprintf("/%d/test/kv", randVal)
cms.Params.ProxyTimeTickChannel = fmt.Sprintf("proxyTimeTick%d", randVal)
@ -43,9 +47,6 @@ func TestGrpcService(t *testing.T) {
t.Logf("master service port = %d", cms.Params.Port)
svr, err := NewGrpcServer()
assert.Nil(t, err)
core := svr.core.(*cms.Core)
err = svr.Init(&cms.InitParams{ProxyTimeTickChannel: fmt.Sprintf("proxyTimeTick%d", randVal)})
@ -94,8 +95,11 @@ func TestGrpcService(t *testing.T) {
return []string{"file1", "file2", "file3"}, nil
}
var binlogLock sync.Mutex
binlogPathArray := make([]string, 0, 16)
core.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error) {
binlogLock.Lock()
defer binlogLock.Unlock()
binlogPathArray = append(binlogPathArray, binlog...)
return 2000, nil
}
@ -109,7 +113,7 @@ func TestGrpcService(t *testing.T) {
err = svr.Start()
assert.Nil(t, err)
cli, err := NewGrpcClient(fmt.Sprintf("127.0.0.1:%d", cms.Params.Port), 3*time.Second)
cli, err := NewGrpcClient(fmt.Sprintf("%s:%d", cms.Params.Address, cms.Params.Port), 3*time.Second)
assert.Nil(t, err)
err = cli.Init(&cms.InitParams{ProxyTimeTickChannel: fmt.Sprintf("proxyTimeTick%d", randVal)})
@ -403,6 +407,8 @@ func TestGrpcService(t *testing.T) {
rsp, err := cli.CreateIndex(req)
assert.Nil(t, err)
assert.Equal(t, rsp.ErrorCode, commonpb.ErrorCode_SUCCESS)
binlogLock.Lock()
defer binlogLock.Unlock()
assert.Equal(t, 3, len(binlogPathArray))
assert.ElementsMatch(t, binlogPathArray, []string{"file1", "file2", "file3"})

View File

@ -96,14 +96,17 @@ type Interface interface {
// master core
type Core struct {
//TODO DataService Interface
//TODO IndexService Interface
//TODO ProxyServiceClient Interface, get proxy service time tick channel,InvalidateCollectionMetaCache
/*
ProxyServiceClient Interface:
get proxy service time tick channel,InvalidateCollectionMetaCache
//TODO Segment States Channel, from DataService, if create new segment, data service should put the segment id into this channel, and let the master add the segment id to the collection meta
DataService Interface:
Segment States Channel, from DataService, if create new segment, data service should put the segment id into this channel, and let the master add the segment id to the collection meta
Segment Flush Watcher, monitor if segment has flushed into disk
//TODO Segment Flush Watcher, monitor if segment has flushed into disk
//TODO indexBuilder Sch, tell index service to build index
IndexService Interface:
indexBuilder Sch, tell index service to build index
*/
MetaTable *metaTable
//id allocator
@ -485,8 +488,8 @@ func (c *Core) setMsgStreams() error {
}
// receive time tick from msg stream
c.ProxyTimeTickChan = make(chan typeutil.Timestamp, 1024)
go func() {
c.ProxyTimeTickChan = make(chan typeutil.Timestamp, 1024)
for {
select {
case <-c.ctx.Done():
@ -515,6 +518,7 @@ func (c *Core) setMsgStreams() error {
dataServiceStream.CreatePulsarConsumers([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
dataServiceStream.Start()
c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo, 1024)
c.DataNodeSegmentFlushCompletedChan = make(chan typeutil.UniqueID, 1024)
// receive segment info from msg stream
go func() {
@ -536,7 +540,7 @@ func (c *Core) setMsgStreams() error {
if ok {
c.DataNodeSegmentFlushCompletedChan <- flushMsg.SegmentFlushCompletedMsg.SegmentID
} else {
log.Printf("receiver unexpected msg from data service stream, value = %v", segm)
log.Printf("receive unexpected msg from data service stream, value = %v", segm)
}
}
}

View File

@ -18,7 +18,8 @@ import (
func TestMetaTable(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
etcdAddr := "127.0.0.1:2379"
Params.Init()
etcdAddr := Params.EtcdAddress
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})

View File

@ -17,14 +17,149 @@ type ParamTable struct {
EtcdAddress string
MetaRootPath string
KvRootPath string
ProxyTimeTickChannel string
ProxyTimeTickChannel string //get from proxy client
MsgChannelSubName string
TimeTickChannel string
DdChannel string
StatisticsChannel string
DataServiceSegmentChannel string // data service create segment, or data node flush segment
DataServiceSegmentChannel string // get from data service, data service create segment, or data node flush segment
MaxPartitionNum int64
DefaultPartitionName string
DefaultIndexName string
}
func (p *ParamTable) Init() {
// load yaml
p.BaseTable.Init()
err := p.LoadYaml("advanced/master.yaml")
if err != nil {
panic(err)
}
p.initAddress()
p.initPort()
p.initNodeID()
p.initPulsarAddress()
p.initEtcdAddress()
p.initMetaRootPath()
p.initKvRootPath()
p.initMsgChannelSubName()
p.initTimeTickChannel()
p.initDdChannelName()
p.initStatisticsChannelName()
p.initMaxPartitionNum()
p.initDefaultPartitionName()
p.initDefaultIndexName()
}
func (p *ParamTable) initAddress() {
masterAddress, err := p.Load("master.address")
if err != nil {
panic(err)
}
p.Address = masterAddress
}
func (p *ParamTable) initPort() {
p.Port = p.ParseInt("master.port")
}
func (p *ParamTable) initNodeID() {
p.NodeID = uint64(p.ParseInt64("master.nodeID"))
}
func (p *ParamTable) initPulsarAddress() {
addr, err := p.Load("_PulsarAddress")
if err != nil {
panic(err)
}
p.PulsarAddress = addr
}
func (p *ParamTable) initEtcdAddress() {
addr, err := p.Load("_EtcdAddress")
if err != nil {
panic(err)
}
p.EtcdAddress = addr
}
func (p *ParamTable) initMetaRootPath() {
rootPath, err := p.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := p.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
p.MetaRootPath = rootPath + "/" + subPath
}
func (p *ParamTable) initKvRootPath() {
rootPath, err := p.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := p.Load("etcd.kvSubPath")
if err != nil {
panic(err)
}
p.KvRootPath = rootPath + "/" + subPath
}
func (p *ParamTable) initMsgChannelSubName() {
name, err := p.Load("msgChannel.subNamePrefix.masterSubNamePrefix")
if err != nil {
panic(err)
}
p.MsgChannelSubName = name
}
func (p *ParamTable) initTimeTickChannel() {
channel, err := p.Load("msgChannel.chanNamePrefix.masterTimeTick")
if err != nil {
panic(err)
}
p.TimeTickChannel = channel
}
func (p *ParamTable) initDdChannelName() {
channel, err := p.Load("msgChannel.chanNamePrefix.dataDefinition")
if err != nil {
panic(err)
}
p.DdChannel = channel
}
func (p *ParamTable) initStatisticsChannelName() {
channel, err := p.Load("msgChannel.chanNamePrefix.masterStatistics")
if err != nil {
panic(err)
}
p.StatisticsChannel = channel
}
func (p *ParamTable) initMaxPartitionNum() {
p.MaxPartitionNum = p.ParseInt64("master.maxPartitionNum")
}
func (p *ParamTable) initDefaultPartitionName() {
name, err := p.Load("common.defaultPartitionName")
if err != nil {
panic(err)
}
p.DefaultPartitionName = name
}
func (p *ParamTable) initDefaultIndexName() {
name, err := p.Load("common.defaultIndexName")
if err != nil {
panic(err)
}
p.DefaultIndexName = name
}

View File

@ -0,0 +1,53 @@
package masterservice
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestParamTable(t *testing.T) {
Params.Init()
assert.NotEqual(t, Params.Address, "")
t.Logf("master address = %s", Params.Address)
assert.NotEqual(t, Params.Port, 0)
t.Logf("master port = %d", Params.Port)
assert.NotEqual(t, Params.NodeID, 0)
t.Logf("master node ID = %d", Params.NodeID)
assert.NotEqual(t, Params.PulsarAddress, "")
t.Logf("pulsar address = %s", Params.PulsarAddress)
assert.NotEqual(t, Params.EtcdAddress, "")
t.Logf("etcd address = %s", Params.EtcdAddress)
assert.NotEqual(t, Params.MetaRootPath, "")
t.Logf("meta root path = %s", Params.MetaRootPath)
assert.NotEqual(t, Params.KvRootPath, "")
t.Logf("kv root path = %s", Params.KvRootPath)
assert.NotEqual(t, Params.MsgChannelSubName, "")
t.Logf("msg channel sub name = %s", Params.MsgChannelSubName)
assert.NotEqual(t, Params.TimeTickChannel, "")
t.Logf("master time tick channel = %s", Params.TimeTickChannel)
assert.NotEqual(t, Params.DdChannel, "")
t.Logf("master dd channel = %s", Params.DdChannel)
assert.NotEqual(t, Params.StatisticsChannel, "")
t.Logf("master statistics channel = %s", Params.StatisticsChannel)
assert.NotEqual(t, Params.MaxPartitionNum, 0)
t.Logf("master initMaxPartitionNum = %d", Params.MaxPartitionNum)
assert.NotEqual(t, Params.DefaultPartitionName, "")
t.Logf("default partition name = %s", Params.DefaultPartitionName)
assert.NotEqual(t, Params.DefaultIndexName, "")
t.Logf("default index name = %s", Params.DefaultIndexName)
}

View File

@ -20,4 +20,5 @@ go test -race -cover "${MILVUS_DIR}/writenode/..." -failfast
go test -race -cover "${MILVUS_DIR}/master/..." -failfast
go test -race -cover "${MILVUS_DIR}/indexnode/..." -failfast
go test -race -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast
go test -race -cover -v "${MILVUS_DIR}/masterservice" "${MILVUS_DIR}/distributed/masterservice" -failfast
#go test -race -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." -failfast