update ParameterTable

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/4973/head^2
cai.zhang 2020-11-21 15:06:46 +08:00 committed by yefu.chen
parent 8b88a9c306
commit 17c3a70d64
31 changed files with 175 additions and 203 deletions

3
.env
View File

@ -3,3 +3,6 @@ ARCH=amd64
UBUNTU=18.04
DATE_VERSION=20201120-092740
LATEST_DATE_VERSION=latest
PULSAR_ADDRESS=pulsar://localhost:6650
ETCD_ADDRESS=localhost:2379
MASTER_ADDRESS=localhost:53100

View File

@ -17,15 +17,14 @@ func main() {
// Creates server.
ctx, cancel := context.WithCancel(context.Background())
etcdAddress, _ := master.Params.EtcdAddress()
etcdRootPath, _ := master.Params.EtcdRootPath()
pulsarAddr, _ := master.Params.PulsarAddress()
pulsarAddr = "pulsar://" + pulsarAddr
etcdAddress := master.Params.EtcdAddress()
etcdRootPath := master.Params.EtcdRootPath()
pulsarAddr := master.Params.PulsarAddress()
defaultRecordSize := master.Params.DefaultRecordSize()
minimumAssignSize := master.Params.MinimumAssignSize()
segmentThreshold := master.Params.SegmentThreshold()
segmentExpireDuration := master.Params.SegmentExpireDuration()
numOfChannel, _ := master.Params.TopicNum()
numOfChannel := master.Params.TopicNum()
nodeNum, _ := master.Params.QueryNodeNum()
statsChannel := master.Params.StatsChannels()

View File

@ -19,6 +19,9 @@ services:
shm_size: 2G
environment:
<<: *ccache
PULSAR_ADDRESS: ${PULSAR_ADDRESS}
ETCD_ADDRESS: ${ETCD_ADDRESS}
MASTER_ADDRESS: ${MASTER_ADDRESS}
volumes: &ubuntu-volumes
- .:/go/src/github.com/zilliztech/milvus-distributed:delegated
- ${DOCKER_VOLUME_DIRECTORY:-.docker}/${ARCH}-ubuntu${UBUNTU}-cache:/ccache:delegated
@ -37,6 +40,10 @@ services:
security_opt: # options needed for gdb debugging
- seccomp:unconfined
- apparmor:unconfined
environment:
PULSAR_ADDRESS: ${PULSAR_ADDRESS}
ETCD_ADDRESS: ${ETCD_ADDRESS}
MASTER_ADDRESS: ${MASTER_ADDRESS}
volumes:
- .:/go/src/github.com/zilliztech/milvus-distributed:delegated
- ${DOCKER_VOLUME_DIRECTORY:-.docker}/${ARCH}-ubuntu${UBUNTU}-gdbserver-home:/home/debugger:delegated

View File

@ -20,7 +20,7 @@ func TestMain(m *testing.M) {
func TestEtcdKV_Load(t *testing.T) {
etcdAddr, err := Params.EtcdAddress()
etcdAddr, err := Params.Load("_EtcdAddress")
if err != nil {
panic(err)
}
@ -78,7 +78,7 @@ func TestEtcdKV_Load(t *testing.T) {
func TestEtcdKV_MultiSave(t *testing.T) {
etcdAddr, err := Params.EtcdAddress()
etcdAddr, err := Params.Load("_EtcdAddress")
if err != nil {
panic(err)
}
@ -109,7 +109,7 @@ func TestEtcdKV_MultiSave(t *testing.T) {
func TestEtcdKV_Remove(t *testing.T) {
etcdAddr, err := Params.EtcdAddress()
etcdAddr, err := Params.Load("_EtcdAddress")
if err != nil {
panic(err)
}
@ -180,7 +180,7 @@ func TestEtcdKV_Remove(t *testing.T) {
func TestEtcdKV_MultiSaveAndRemove(t *testing.T) {
etcdAddr, err := Params.EtcdAddress()
etcdAddr, err := Params.Load("_EtcdAddress")
if err != nil {
panic(err)
}

View File

@ -23,7 +23,7 @@ func TestMaster_CollectionTask(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
etcdAddr, _ := Params.EtcdAddress()
etcdAddr := Params.EtcdAddress()
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
@ -34,7 +34,7 @@ func TestMaster_CollectionTask(t *testing.T) {
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
PulsarAddr: Params.PulsarAddress(),
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",

View File

@ -15,10 +15,7 @@ var gTestIDAllocator *GlobalIDAllocator
func TestMain(m *testing.M) {
Params.Init()
etcdAddr, err := Params.EtcdAddress()
if err != nil {
panic(err)
}
etcdAddr := Params.EtcdAddress()
gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "tso"))
gTestIDAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "gid"))
exitCode := m.Run()

View File

@ -21,10 +21,7 @@ func TestMaster_CreateCollection(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
etcdAddr, err := Params.EtcdAddress()
if err != nil {
panic(err)
}
etcdAddr := Params.EtcdAddress()
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
_, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix())
@ -34,7 +31,7 @@ func TestMaster_CreateCollection(t *testing.T) {
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
PulsarAddr: Params.PulsarAddress(),
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",

View File

@ -15,10 +15,7 @@ import (
func TestMetaTable_Collection(t *testing.T) {
Init()
etcdAddr, err := Params.EtcdAddress()
if err != nil {
panic(err)
}
etcdAddr := Params.EtcdAddress()
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
etcdKV := kv.NewEtcdKV(cli, "/etcd/test/root")
@ -155,10 +152,7 @@ func TestMetaTable_Collection(t *testing.T) {
func TestMetaTable_DeletePartition(t *testing.T) {
Init()
etcdAddr, err := Params.EtcdAddress()
if err != nil {
panic(err)
}
etcdAddr := Params.EtcdAddress()
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
@ -249,10 +243,7 @@ func TestMetaTable_DeletePartition(t *testing.T) {
func TestMetaTable_Segment(t *testing.T) {
Init()
etcdAddr, err := Params.EtcdAddress()
if err != nil {
panic(err)
}
etcdAddr := Params.EtcdAddress()
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
@ -333,10 +324,7 @@ func TestMetaTable_Segment(t *testing.T) {
func TestMetaTable_UpdateSegment(t *testing.T) {
Init()
etcdAddr, err := Params.EtcdAddress()
if err != nil {
panic(err)
}
etcdAddr := Params.EtcdAddress()
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)

View File

@ -178,3 +178,39 @@ func (p *ParamTable) K2STimeSyncChannels() []string {
}
return strings.Split(chs, ",")
}
func (p *ParamTable) PulsarAddress() string {
pulsarAddress, err := p.Load("_PulsarAddress")
if err != nil {
panic(err)
}
return pulsarAddress
}
func (p *ParamTable) EtcdAddress() string {
etcdAddress, err := p.Load("_EtcdAddress")
if err != nil {
panic(err)
}
return etcdAddress
}
func (p *ParamTable) EtcdRootPath() string {
etcdRootPath, err := p.Load("etcd.rootpath")
if err != nil {
panic(err)
}
return etcdRootPath
}
func (p *ParamTable) TopicNum() int {
topicNum, err := p.Load("pulsar.topicnum")
if err != nil {
panic(err)
}
num, err := strconv.Atoi(topicNum)
if err != nil {
panic(err)
}
return num
}

View File

@ -24,11 +24,7 @@ func TestMaster_Partition(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
etcdAddr, err := Params.EtcdAddress()
if err != nil {
panic(err)
}
etcdAddr := Params.EtcdAddress()
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
_, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix())
@ -38,7 +34,7 @@ func TestMaster_Partition(t *testing.T) {
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
PulsarAddr: Params.PulsarAddress(),
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",

View File

@ -26,10 +26,7 @@ var kvBase *kv.EtcdKV
func setup() {
Params.Init()
etcdAddress, err := Params.EtcdAddress()
if err != nil {
panic(err)
}
etcdAddress := Params.EtcdAddress()
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
if err != nil {

View File

@ -79,7 +79,8 @@ func receiveMsg(stream *ms.MsgStream) []uint64 {
}
func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
Init()
pulsarAddress := Params.PulsarAddress()
producerChannels := []string{"proxyTtBarrier"}
consumerChannels := []string{"proxyTtBarrier"}

View File

@ -67,7 +67,8 @@ func getEmptyMsgPack() *ms.MsgPack {
}
func producer(channels []string, ttmsgs [][2]int) (*ms.MsgStream, *ms.MsgStream) {
pulsarAddress := "pulsar://localhost:6650"
Init()
pulsarAddress := Params.PulsarAddress()
consumerSubName := "subTimetick"
producerChannels := channels
consumerChannels := channels

View File

@ -372,8 +372,7 @@ func (ms *PulsarTtMsgStream) findTimeTick(channelIndex int,
}
}
//TODO test InMemMsgStream
//TODO test InMemMsgStream
/*
type InMemMsgStream struct {
buffer chan *MsgPack

View File

@ -4,12 +4,22 @@ import (
"context"
"fmt"
"log"
"os"
"testing"
commonPb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
var Params paramtable.BaseTable
func TestMain(m *testing.M) {
Params.Init()
exitCode := m.Run()
os.Exit(exitCode)
}
func repackFunc(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range msgs {
@ -209,7 +219,7 @@ func receiveMsg(outputStream *MsgStream, msgCount int) {
}
func TestStream_PulsarMsgStream_Insert(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
@ -231,7 +241,7 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) {
}
func TestStream_PulsarMsgStream_Delete(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"delete"}
consumerChannels := []string{"delete"}
consumerSubName := "subDelete"
@ -251,7 +261,7 @@ func TestStream_PulsarMsgStream_Delete(t *testing.T) {
}
func TestStream_PulsarMsgStream_Search(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"search"}
consumerChannels := []string{"search"}
consumerSubName := "subSearch"
@ -271,7 +281,7 @@ func TestStream_PulsarMsgStream_Search(t *testing.T) {
}
func TestStream_PulsarMsgStream_SearchResult(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"searchResult"}
consumerChannels := []string{"searchResult"}
consumerSubName := "subSearchResult"
@ -291,7 +301,7 @@ func TestStream_PulsarMsgStream_SearchResult(t *testing.T) {
}
func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"timeTick"}
consumerChannels := []string{"timeTick"}
consumerSubName := "subTimeTick"
@ -311,7 +321,7 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
}
func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
@ -331,7 +341,7 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
}
func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
@ -351,7 +361,7 @@ func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) {
}
func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
@ -404,7 +414,7 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
}
func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
@ -454,7 +464,7 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
}
func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
@ -487,7 +497,7 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
}
func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"

View File

@ -118,7 +118,7 @@ func getInsertTask(reqID UniqueID, hashValue int32) TsMsg {
}
func TestStream_task_Insert(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"

View File

@ -23,7 +23,7 @@ func newInsertMsgUnmarshal(input []byte) (TsMsg, error) {
}
func TestStream_unmarshal_Insert(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"

View File

@ -13,3 +13,19 @@ var Params ParamTable
func (p *ParamTable) InitParamTable() {
p.Init()
}
func (p *ParamTable) MasterAddress() string {
masterAddress, err := p.Load("_MasterAddress")
if err != nil {
panic(err)
}
return masterAddress
}
func (p *ParamTable) PulsarAddress() string {
pulsarAddress, err := p.Load("_PulsarAddress")
if err != nil {
panic(err)
}
return pulsarAddress
}

View File

@ -58,7 +58,7 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
}
// TODO: use config instead
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress := Params.PulsarAddress()
bufSize := int64(1000)
manipulationChannels := []string{"manipulation"}
queryChannels := []string{"query"}
@ -81,10 +81,7 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
unmarshal,
bufSize)
masterAddr, err := Params.MasterAddress()
if err != nil {
panic(err)
}
masterAddr := Params.MasterAddress()
idAllocator, err := allocator.NewIDAllocator(p.proxyLoopCtx, masterAddr)
if err != nil {
@ -161,10 +158,7 @@ func (p *Proxy) grpcLoop() {
}
func (p *Proxy) connectMaster() error {
masterAddr, err := Params.MasterAddress()
if err != nil {
panic(err)
}
masterAddr := Params.MasterAddress()
log.Printf("Proxy connected to master, master_addr=%s", masterAddr)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

View File

@ -38,14 +38,8 @@ var testNum = 10
func startMaster(ctx context.Context) {
master.Init()
etcdAddr, err := master.Params.EtcdAddress()
if err != nil {
panic(err)
}
rootPath, err := master.Params.EtcdRootPath()
if err != nil {
panic(err)
}
etcdAddr := master.Params.EtcdAddress()
rootPath := master.Params.EtcdRootPath()
kvRootPath := path.Join(rootPath, "kv")
metaRootPath := path.Join(rootPath, "meta")
@ -62,7 +56,7 @@ func startMaster(ctx context.Context) {
KVRootPath: kvRootPath,
MetaRootPath: metaRootPath,
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
PulsarAddr: Params.PulsarAddress(),
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
@ -290,7 +284,7 @@ func TestProxy_Search(t *testing.T) {
queryResultChannels := []string{"QueryResult"}
bufSize := 1024
queryResultMsgStream := msgstream.NewPulsarMsgStream(ctx, int64(bufSize))
pulsarAddress := "pulsar://localhost:6650"
pulsarAddress := Params.PulsarAddress()
queryResultMsgStream.SetPulsarClient(pulsarAddress)
assert.NotEqual(t, queryResultMsgStream, nil, "query result message stream should not be nil!")
queryResultMsgStream.CreatePulsarProducers(queryResultChannels)

View File

@ -48,11 +48,7 @@ func newTimeTick(ctx context.Context,
bufSize := int64(1000)
t.tickMsgStream = msgstream.NewPulsarMsgStream(t.ctx, bufSize)
pulsarAddress, err := Params.PulsarAddress()
if err != nil {
panic(err)
}
pulsarAddress = "pulsar://" + pulsarAddress
pulsarAddress := Params.PulsarAddress()
producerChannels := []string{"timeTick"}
t.tickMsgStream.SetPulsarClient(pulsarAddress)

View File

@ -28,10 +28,7 @@ func TestTimeTick_Start(t *testing.T) {
func TestTimeTick_Start2(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
masterAddr, err := Params.MasterAddress()
if err != nil {
panic(err)
}
masterAddr := Params.MasterAddress()
tsoAllocator, err := allocator.NewTimestampAllocator(ctx, masterAddr)
assert.Nil(t, err)
err = tsoAllocator.Start()

View File

@ -32,7 +32,7 @@ func TestDataSyncService_Start(t *testing.T) {
}
// init query node
pulsarURL := "pulsar://localhost:6650"
pulsarURL, _ := Params.pulsarAddress()
node := NewQueryNode(ctx, 0)
// init meta

View File

@ -30,15 +30,8 @@ type metaService struct {
}
func newMetaService(ctx context.Context, replica *collectionReplica) *metaService {
ETCDAddr, err := Params.EtcdAddress()
if err != nil {
panic(err)
}
ETCDRootPath, err := Params.EtcdRootPath()
if err != nil {
panic(err)
}
ETCDAddr := Params.etcdAddress()
ETCDRootPath := Params.etcdRootPath()
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{ETCDAddr},
@ -78,28 +71,22 @@ func (mService *metaService) start() {
}
func GetCollectionObjID(key string) string {
ETCDRootPath, err := Params.EtcdRootPath()
if err != nil {
panic(err)
}
ETCDRootPath := Params.etcdRootPath()
prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/"
return strings.TrimPrefix(key, prefix)
}
func GetSegmentObjID(key string) string {
ETCDRootPath, err := Params.EtcdRootPath()
if err != nil {
panic(err)
}
ETCDRootPath := Params.etcdRootPath()
prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/"
return strings.TrimPrefix(key, prefix)
}
func isCollectionObj(key string) bool {
ETCDRootPath, err := Params.EtcdRootPath()
if err != nil {
panic(err)
}
ETCDRootPath := Params.etcdRootPath()
prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/"
prefix = strings.TrimSpace(prefix)
index := strings.Index(key, prefix)
@ -108,10 +95,8 @@ func isCollectionObj(key string) bool {
}
func isSegmentObj(key string) bool {
ETCDRootPath, err := Params.EtcdRootPath()
if err != nil {
panic(err)
}
ETCDRootPath := Params.etcdRootPath()
prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/"
prefix = strings.TrimSpace(prefix)
index := strings.Index(key, prefix)

View File

@ -3,6 +3,7 @@ package reader
import (
"context"
"math"
"os"
"testing"
"time"
@ -13,9 +14,14 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
func TestMain(m *testing.M) {
Params.Init()
exitCode := m.Run()
os.Exit(exitCode)
}
func TestMetaService_start(t *testing.T) {
var ctx context.Context
Params.Init()
if closeWithDeadline {
var cancel context.CancelFunc
@ -34,8 +40,6 @@ func TestMetaService_start(t *testing.T) {
}
func TestMetaService_getCollectionObjId(t *testing.T) {
Params.Init()
var key = "/collection/collection0"
var collectionObjID1 = GetCollectionObjID(key)
@ -48,8 +52,6 @@ func TestMetaService_getCollectionObjId(t *testing.T) {
}
func TestMetaService_getSegmentObjId(t *testing.T) {
Params.Init()
var key = "/segment/segment0"
var segmentObjID1 = GetSegmentObjID(key)
@ -62,8 +64,6 @@ func TestMetaService_getSegmentObjId(t *testing.T) {
}
func TestMetaService_isCollectionObj(t *testing.T) {
Params.Init()
var key = "by-dev/collection/collection0"
var b1 = isCollectionObj(key)
@ -76,8 +76,6 @@ func TestMetaService_isCollectionObj(t *testing.T) {
}
func TestMetaService_isSegmentObj(t *testing.T) {
Params.Init()
var key = "by-dev/segment/segment0"
var b1 = isSegmentObj(key)
@ -90,8 +88,6 @@ func TestMetaService_isSegmentObj(t *testing.T) {
}
func TestMetaService_isSegmentChannelRangeInQueryNodeChannelRange(t *testing.T) {
Params.Init()
var s = etcdpb.SegmentMeta{
SegmentID: UniqueID(0),
CollectionID: UniqueID(0),
@ -179,8 +175,6 @@ func TestMetaService_printSegmentStruct(t *testing.T) {
}
func TestMetaService_processCollectionCreate(t *testing.T) {
Params.Init()
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
@ -224,8 +218,6 @@ func TestMetaService_processCollectionCreate(t *testing.T) {
}
func TestMetaService_processSegmentCreate(t *testing.T) {
Params.Init()
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
@ -296,8 +288,6 @@ func TestMetaService_processSegmentCreate(t *testing.T) {
}
func TestMetaService_processCreate(t *testing.T) {
Params.Init()
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
@ -352,8 +342,6 @@ func TestMetaService_processCreate(t *testing.T) {
}
func TestMetaService_processSegmentModify(t *testing.T) {
Params.Init()
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
@ -435,8 +423,6 @@ func TestMetaService_processSegmentModify(t *testing.T) {
}
func TestMetaService_processCollectionModify(t *testing.T) {
Params.Init()
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
@ -508,8 +494,6 @@ func TestMetaService_processCollectionModify(t *testing.T) {
}
func TestMetaService_processModify(t *testing.T) {
Params.Init()
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
@ -606,8 +590,6 @@ func TestMetaService_processModify(t *testing.T) {
}
func TestMetaService_processSegmentDelete(t *testing.T) {
Params.Init()
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
@ -681,8 +663,6 @@ func TestMetaService_processSegmentDelete(t *testing.T) {
}
func TestMetaService_processCollectionDelete(t *testing.T) {
Params.Init()
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
@ -729,8 +709,6 @@ func TestMetaService_processCollectionDelete(t *testing.T) {
}
func TestMetaService_processDelete(t *testing.T) {
Params.Init()
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
@ -792,8 +770,6 @@ func TestMetaService_processDelete(t *testing.T) {
}
func TestMetaService_processResp(t *testing.T) {
Params.Init()
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc
@ -819,8 +795,6 @@ func TestMetaService_processResp(t *testing.T) {
}
func TestMetaService_loadCollections(t *testing.T) {
Params.Init()
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc
@ -840,8 +814,6 @@ func TestMetaService_loadCollections(t *testing.T) {
}
func TestMetaService_loadSegments(t *testing.T) {
Params.Init()
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc

View File

@ -25,7 +25,7 @@ func (p *ParamTable) pulsarAddress() (string, error) {
if err != nil {
panic(err)
}
return "pulsar://" + url, nil
return url, nil
}
func (p *ParamTable) queryNodeID() int {
@ -177,3 +177,19 @@ func (p *ParamTable) statsReceiveBufSize() int64 {
}
return int64(bufSize)
}
func (p *ParamTable) etcdAddress() string {
etcdAddress, err := p.Load("_EtcdAddress")
if err != nil {
panic(err)
}
return etcdAddress
}
func (p *ParamTable) etcdRootPath() string {
etcdRootPath, err := p.Load("etcd.rootpath")
if err != nil {
panic(err)
}
return etcdRootPath
}

View File

@ -1,6 +1,7 @@
package reader
import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
@ -14,7 +15,9 @@ func TestParamTable_PulsarAddress(t *testing.T) {
Params.Init()
address, err := Params.pulsarAddress()
assert.NoError(t, err)
assert.Equal(t, address, "pulsar://localhost:6650")
split := strings.Split(address, ":")
assert.Equal(t, split[0], "pulsar")
assert.Equal(t, split[len(split)-1], "6650")
}
func TestParamTable_QueryNodeID(t *testing.T) {

View File

@ -26,7 +26,7 @@ func TestSearch_Search(t *testing.T) {
defer cancel()
// init query node
pulsarURL := "pulsar://localhost:6650"
pulsarURL, _ := Params.pulsarAddress()
node := NewQueryNode(ctx, 0)
// init meta

View File

@ -622,7 +622,7 @@ func TestSegment_segmentSearch(t *testing.T) {
dslString := "{\"bool\": { \n\"vector\": {\n \"vec\": {\n \"metric_type\": \"L2\", \n \"params\": {\n \"nprobe\": 10 \n},\n \"query\": \"$0\",\"topk\": 10 \n } \n } \n } \n }"
pulsarURL := "pulsar://localhost:6650"
pulsarURL, _ := Params.pulsarAddress()
const receiveBufSize = 1024
searchProducerChannels := []string{"search"}
searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)

View File

@ -108,7 +108,7 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
}
// init query node
pulsarURL := "pulsar://localhost:6650"
pulsarURL, _ := Params.pulsarAddress()
node := NewQueryNode(ctx, 0)
// init meta

View File

@ -13,15 +13,14 @@ package paramtable
import (
"log"
"os"
"path"
"runtime"
"strconv"
"strings"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/spf13/cast"
"github.com/spf13/viper"
"github.com/zilliztech/milvus-distributed/internal/kv"
)
type Base interface {
@ -44,25 +43,28 @@ func (gp *BaseTable) Init() {
panic(err)
}
etcdAddress, _ := gp.Load("etcd.address")
etcdPort, _ := gp.Load("etcd.port")
etcdAddress += ":" + etcdPort
etcdAddress := os.Getenv("ETCD_ADDRESS")
if etcdAddress == "" {
etcdAddress = "localhost:2379"
}
err = gp.Save("_EtcdAddress", etcdAddress)
if err != nil {
panic(err)
}
pulsarAddress, _ := gp.Load("pulsar.address")
pulsarPort, _ := gp.Load("pulsar.port")
pulsarAddress += ":" + pulsarPort
pulsarAddress := os.Getenv("PULSAR_ADDRESS")
if pulsarAddress == "" {
pulsarAddress = "pulsar://localhost:6650"
}
err = gp.Save("_PulsarAddress", pulsarAddress)
if err != nil {
panic(err)
}
masterAddress, _ := gp.Load("master.address")
masterPort, _ := gp.Load("master.port")
masterAddress += ":" + masterPort
masterAddress := os.Getenv("MASTER_ADDRESS")
if masterAddress == "" {
masterAddress = "localhost:53100"
}
err = gp.Save("_MasterAddress", masterAddress)
if err != nil {
panic(err)
@ -109,7 +111,6 @@ func (gp *BaseTable) LoadYaml(fileName string) error {
log.Panicf("undefine config type, key=%s", key)
}
}
log.Printf("%s : %s", key, str)
err = gp.params.Save(strings.ToLower(key), str)
if err != nil {
panic(err)
@ -127,36 +128,3 @@ func (gp *BaseTable) Remove(key string) error {
func (gp *BaseTable) Save(key, value string) error {
return gp.params.Save(strings.ToLower(key), value)
}
func (gp *BaseTable) EtcdAddress() (string, error) {
return gp.Load("_EtcdAddress")
}
func (gp *BaseTable) PulsarAddress() (string, error) {
return gp.Load("_PulsarAddress")
}
func (gp *BaseTable) MasterAddress() (string, error) {
return gp.Load("_MasterAddress")
}
func (gp *BaseTable) EtcdRootPath() (string, error) {
return gp.Load("etcd.rootpath")
}
func (gp *BaseTable) TopicNum() (int, error) {
topicNum, _ := gp.Load("pulsar.topicnum")
return strconv.Atoi(topicNum)
}
func (gp *BaseTable) StorageAddress() (string, error) {
storageAddress, _ := gp.Load("storage.address")
storagePort, _ := gp.Load("storage.address")
return storageAddress + ":" + storagePort, nil
}
func (gp *BaseTable) BucketName() string {
bucketName, _ := gp.Load("writer.bucket")
return bucketName
}