Add metaRootPath and kvRootPath

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2020-11-27 09:59:40 +08:00 committed by yefu.chen
parent c507abdeaa
commit 1decc1a4ca
13 changed files with 75 additions and 35 deletions

View File

@ -19,6 +19,8 @@ etcd:
address: localhost address: localhost
port: 2379 port: 2379
rootPath: by-dev rootPath: by-dev
metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath
kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath
segThreshold: 10000 segThreshold: 10000
pulsar: pulsar:

View File

@ -33,7 +33,8 @@ func TestMaster_CollectionTask(t *testing.T) {
Port: Params.Port, Port: Params.Port,
EtcdAddress: Params.EtcdAddress, EtcdAddress: Params.EtcdAddress,
EtcdRootPath: "/test/root", MetaRootPath: "/test/root/meta",
KvRootPath: "/test/root/kv",
PulsarAddress: Params.PulsarAddress, PulsarAddress: Params.PulsarAddress,
ProxyIDList: []typeutil.UniqueID{1, 2}, ProxyIDList: []typeutil.UniqueID{1, 2},

View File

@ -32,7 +32,8 @@ func TestMaster_CreateCollection(t *testing.T) {
Port: Params.Port, Port: Params.Port,
EtcdAddress: Params.EtcdAddress, EtcdAddress: Params.EtcdAddress,
EtcdRootPath: "/test/root", MetaRootPath: "/test/root/meta",
KvRootPath: "/test/root/kv",
PulsarAddress: Params.PulsarAddress, PulsarAddress: Params.PulsarAddress,
ProxyIDList: []typeutil.UniqueID{1, 2}, ProxyIDList: []typeutil.UniqueID{1, 2},

View File

@ -82,8 +82,8 @@ func Init() {
func CreateServer(ctx context.Context) (*Master, error) { func CreateServer(ctx context.Context) (*Master, error) {
//Init(etcdAddr, kvRootPath) //Init(etcdAddr, kvRootPath)
etcdAddress := Params.EtcdAddress etcdAddress := Params.EtcdAddress
metaRootPath := Params.EtcdRootPath metaRootPath := Params.MetaRootPath
kvRootPath := Params.EtcdRootPath kvRootPath := Params.KvRootPath
pulsarAddr := Params.PulsarAddress pulsarAddr := Params.PulsarAddress
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})

View File

@ -17,7 +17,8 @@ type ParamTable struct {
Port int Port int
EtcdAddress string EtcdAddress string
EtcdRootPath string MetaRootPath string
KvRootPath string
PulsarAddress string PulsarAddress string
// nodeID // nodeID
@ -75,7 +76,8 @@ func (p *ParamTable) Init() {
p.initPort() p.initPort()
p.initEtcdAddress() p.initEtcdAddress()
p.initEtcdRootPath() p.initMetaRootPath()
p.initKvRootPath()
p.initPulsarAddress() p.initPulsarAddress()
p.initProxyIDList() p.initProxyIDList()
@ -138,12 +140,28 @@ func (p *ParamTable) initPulsarAddress() {
p.PulsarAddress = addr p.PulsarAddress = addr
} }
func (p *ParamTable) initEtcdRootPath() { func (p *ParamTable) initMetaRootPath() {
path, err := p.Load("etcd.rootpath") rootPath, err := p.Load("etcd.rootPath")
if err != nil { if err != nil {
panic(err) panic(err)
} }
p.EtcdRootPath = path subPath, err := p.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
p.MetaRootPath = rootPath + "/" + subPath
}
func (p *ParamTable) initKvRootPath() {
rootPath, err := p.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := p.Load("etcd.kvSubPath")
if err != nil {
panic(err)
}
p.KvRootPath = rootPath + "/" + subPath
} }
func (p *ParamTable) initTopicNum() { func (p *ParamTable) initTopicNum() {

View File

@ -22,10 +22,16 @@ func TestParamTable_Port(t *testing.T) {
assert.Equal(t, port, 53100) assert.Equal(t, port, 53100)
} }
func TestParamTable_EtcdRootPath(t *testing.T) { func TestParamTable_MetaRootPath(t *testing.T) {
Params.Init() Params.Init()
addr := Params.EtcdRootPath path := Params.MetaRootPath
assert.Equal(t, addr, "by-dev") assert.Equal(t, path, "by-dev/meta")
}
func TestParamTable_KVRootPath(t *testing.T) {
Params.Init()
path := Params.KvRootPath
assert.Equal(t, path, "by-dev/kv")
} }
func TestParamTable_TopicNum(t *testing.T) { func TestParamTable_TopicNum(t *testing.T) {

View File

@ -35,7 +35,8 @@ func TestMaster_Partition(t *testing.T) {
Port: Params.Port, Port: Params.Port,
EtcdAddress: Params.EtcdAddress, EtcdAddress: Params.EtcdAddress,
EtcdRootPath: "/test/root", MetaRootPath: "/test/root/meta",
KvRootPath: "/test/root/kv",
PulsarAddress: Params.PulsarAddress, PulsarAddress: Params.PulsarAddress,
ProxyIDList: []typeutil.UniqueID{1, 2}, ProxyIDList: []typeutil.UniqueID{1, 2},

View File

@ -236,7 +236,8 @@ func startupMaster() {
Port: Params.Port, Port: Params.Port,
EtcdAddress: Params.EtcdAddress, EtcdAddress: Params.EtcdAddress,
EtcdRootPath: rootPath, MetaRootPath: "/test/root/meta",
KvRootPath: "/test/root/kv",
PulsarAddress: Params.PulsarAddress, PulsarAddress: Params.PulsarAddress,
ProxyIDList: []typeutil.UniqueID{1, 2}, ProxyIDList: []typeutil.UniqueID{1, 2},

View File

@ -37,13 +37,13 @@ var testNum = 10
func startMaster(ctx context.Context) { func startMaster(ctx context.Context) {
master.Init() master.Init()
etcdAddr := master.Params.EtcdAddress etcdAddr := master.Params.EtcdAddress
rootPath := master.Params.EtcdRootPath metaRootPath := master.Params.MetaRootPath
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
if err != nil { if err != nil {
panic(err) panic(err)
} }
_, err = etcdCli.Delete(context.TODO(), rootPath, clientv3.WithPrefix()) _, err = etcdCli.Delete(context.TODO(), metaRootPath, clientv3.WithPrefix())
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -31,7 +31,7 @@ type metaService struct {
func newMetaService(ctx context.Context, replica *collectionReplica) *metaService { func newMetaService(ctx context.Context, replica *collectionReplica) *metaService {
ETCDAddr := Params.etcdAddress() ETCDAddr := Params.etcdAddress()
ETCDRootPath := Params.etcdRootPath() MetaRootPath := Params.metaRootPath()
cli, _ := clientv3.New(clientv3.Config{ cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{ETCDAddr}, Endpoints: []string{ETCDAddr},
@ -40,7 +40,7 @@ func newMetaService(ctx context.Context, replica *collectionReplica) *metaServic
return &metaService{ return &metaService{
ctx: ctx, ctx: ctx,
kvBase: kv.NewEtcdKV(cli, ETCDRootPath), kvBase: kv.NewEtcdKV(cli, MetaRootPath),
replica: replica, replica: replica,
} }
} }
@ -71,21 +71,21 @@ func (mService *metaService) start() {
} }
func GetCollectionObjID(key string) string { func GetCollectionObjID(key string) string {
ETCDRootPath := Params.etcdRootPath() ETCDRootPath := Params.metaRootPath()
prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/" prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/"
return strings.TrimPrefix(key, prefix) return strings.TrimPrefix(key, prefix)
} }
func GetSegmentObjID(key string) string { func GetSegmentObjID(key string) string {
ETCDRootPath := Params.etcdRootPath() ETCDRootPath := Params.metaRootPath()
prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/" prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/"
return strings.TrimPrefix(key, prefix) return strings.TrimPrefix(key, prefix)
} }
func isCollectionObj(key string) bool { func isCollectionObj(key string) bool {
ETCDRootPath := Params.etcdRootPath() ETCDRootPath := Params.metaRootPath()
prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/" prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/"
prefix = strings.TrimSpace(prefix) prefix = strings.TrimSpace(prefix)
@ -95,7 +95,7 @@ func isCollectionObj(key string) bool {
} }
func isSegmentObj(key string) bool { func isSegmentObj(key string) bool {
ETCDRootPath := Params.etcdRootPath() ETCDRootPath := Params.metaRootPath()
prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/" prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/"
prefix = strings.TrimSpace(prefix) prefix = strings.TrimSpace(prefix)

View File

@ -64,24 +64,24 @@ func TestMetaService_getSegmentObjId(t *testing.T) {
} }
func TestMetaService_isCollectionObj(t *testing.T) { func TestMetaService_isCollectionObj(t *testing.T) {
var key = "by-dev/collection/collection0" var key = "by-dev/meta/collection/collection0"
var b1 = isCollectionObj(key) var b1 = isCollectionObj(key)
assert.Equal(t, b1, true) assert.Equal(t, b1, true)
key = "by-dev/segment/segment0" key = "by-dev/meta/segment/segment0"
var b2 = isCollectionObj(key) var b2 = isCollectionObj(key)
assert.Equal(t, b2, false) assert.Equal(t, b2, false)
} }
func TestMetaService_isSegmentObj(t *testing.T) { func TestMetaService_isSegmentObj(t *testing.T) {
var key = "by-dev/segment/segment0" var key = "by-dev/meta/segment/segment0"
var b1 = isSegmentObj(key) var b1 = isSegmentObj(key)
assert.Equal(t, b1, true) assert.Equal(t, b1, true)
key = "by-dev/collection/collection0" key = "by-dev/meta/collection/collection0"
var b2 = isSegmentObj(key) var b2 = isSegmentObj(key)
assert.Equal(t, b2, false) assert.Equal(t, b2, false)
@ -295,7 +295,7 @@ func TestMetaService_processCreate(t *testing.T) {
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica) node.metaService = newMetaService(ctx, node.replica)
key1 := "by-dev/collection/0" key1 := "by-dev/meta/collection/0"
msg1 := `schema: < msg1 := `schema: <
name: "test" name: "test"
fields: < fields: <
@ -327,7 +327,7 @@ func TestMetaService_processCreate(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0)) assert.Equal(t, collection.ID(), UniqueID(0))
key2 := "by-dev/segment/0" key2 := "by-dev/meta/segment/0"
msg2 := `partition_tag: "default" msg2 := `partition_tag: "default"
channel_start: 0 channel_start: 0
channel_end: 1 channel_end: 1
@ -529,7 +529,7 @@ func TestMetaService_processModify(t *testing.T) {
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica) node.metaService = newMetaService(ctx, node.replica)
key1 := "by-dev/collection/0" key1 := "by-dev/meta/collection/0"
msg1 := `schema: < msg1 := `schema: <
name: "test" name: "test"
fields: < fields: <
@ -576,7 +576,7 @@ func TestMetaService_processModify(t *testing.T) {
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3") hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3")
assert.Equal(t, hasPartition, false) assert.Equal(t, hasPartition, false)
key2 := "by-dev/segment/0" key2 := "by-dev/meta/segment/0"
msg2 := `partition_tag: "p1" msg2 := `partition_tag: "p1"
channel_start: 0 channel_start: 0
channel_end: 1 channel_end: 1
@ -772,7 +772,7 @@ func TestMetaService_processDelete(t *testing.T) {
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0)
node.metaService = newMetaService(ctx, node.replica) node.metaService = newMetaService(ctx, node.replica)
key1 := "by-dev/collection/0" key1 := "by-dev/meta/collection/0"
msg1 := `schema: < msg1 := `schema: <
name: "test" name: "test"
fields: < fields: <
@ -804,7 +804,7 @@ func TestMetaService_processDelete(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0)) assert.Equal(t, collection.ID(), UniqueID(0))
key2 := "by-dev/segment/0" key2 := "by-dev/meta/segment/0"
msg2 := `partition_tag: "default" msg2 := `partition_tag: "default"
channel_start: 0 channel_start: 0
channel_end: 1 channel_end: 1

View File

@ -199,12 +199,16 @@ func (p *ParamTable) etcdAddress() string {
return etcdAddress return etcdAddress
} }
func (p *ParamTable) etcdRootPath() string { func (p *ParamTable) metaRootPath() string {
etcdRootPath, err := p.Load("etcd.rootpath") rootPath, err := p.Load("etcd.rootPath")
if err != nil { if err != nil {
panic(err) panic(err)
} }
return etcdRootPath subPath, err := p.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
return rootPath + "/" + subPath
} }
func (p *ParamTable) gracefulTime() int64 { func (p *ParamTable) gracefulTime() int64 {

View File

@ -120,3 +120,9 @@ func TestParamTable_statsChannelName(t *testing.T) {
name := Params.statsChannelName() name := Params.statsChannelName()
assert.Equal(t, name, "query-node-stats") assert.Equal(t, name, "query-node-stats")
} }
func TestParamTable_metaRootPath(t *testing.T) {
Params.Init()
path := Params.metaRootPath()
assert.Equal(t, path, "by-dev/meta")
}