mirror of https://github.com/milvus-io/milvus.git
Simplify proxy config
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/4973/head^2
parent
2ebf3b55e4
commit
7c39fc24a9
|
@ -34,4 +34,6 @@ msgChannel:
|
|||
channelRange:
|
||||
insert: [0, 15]
|
||||
delete: [0, 15]
|
||||
k2s: [0, 15]
|
||||
k2s: [0, 15]
|
||||
search: [0, 0]
|
||||
searchResult: [0, 1]
|
|
@ -10,4 +10,19 @@
|
|||
# or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
proxy:
|
||||
timeTickInterval: 200 # ms
|
||||
timeTickInterval: 200 # ms
|
||||
|
||||
msgStream:
|
||||
insert:
|
||||
#streamBufSize: 1024 # msgPack chan buffer size
|
||||
bufSize: 1024 # msgPack chan buffer size
|
||||
|
||||
search:
|
||||
bufSize: 512
|
||||
|
||||
searchResult:
|
||||
recvBufSize: 1024 # msgPack chan buffer size
|
||||
pulsarBufSize: 1024 # pulsar chan buffer size
|
||||
|
||||
timeTick:
|
||||
bufSize: 512
|
|
@ -22,6 +22,7 @@ var globalMetaCache MetaCache
|
|||
|
||||
type SimpleMetaCache struct {
|
||||
mu sync.RWMutex
|
||||
proxyID UniqueID
|
||||
metas map[string]*servicepb.CollectionDescription // collection name to schema
|
||||
masterClient masterpb.MasterClient
|
||||
reqIDAllocator *allocator.IDAllocator
|
||||
|
@ -59,7 +60,7 @@ func (smc *SimpleMetaCache) Update(collectionName string) error {
|
|||
MsgType: internalpb.MsgType_kDescribeCollection,
|
||||
ReqID: reqID,
|
||||
Timestamp: ts,
|
||||
ProxyID: 0,
|
||||
ProxyID: smc.proxyID,
|
||||
CollectionName: &servicepb.CollectionName{
|
||||
CollectionName: collectionName,
|
||||
},
|
||||
|
@ -86,6 +87,7 @@ func newSimpleMetaCache(ctx context.Context,
|
|||
masterClient: mCli,
|
||||
reqIDAllocator: idAllocator,
|
||||
tsoAllocator: tsoAllocator,
|
||||
proxyID: Params.ProxyID(),
|
||||
ctx: ctx,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,12 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
|
||||
)
|
||||
|
||||
|
@ -10,22 +16,289 @@ type ParamTable struct {
|
|||
|
||||
var Params ParamTable
|
||||
|
||||
func (p *ParamTable) InitParamTable() {
|
||||
p.Init()
|
||||
}
|
||||
|
||||
func (p *ParamTable) MasterAddress() string {
|
||||
masterAddress, err := p.Load("_MasterAddress")
|
||||
func (pt *ParamTable) Init() {
|
||||
pt.BaseTable.Init()
|
||||
err := pt.LoadYaml("milvus.yaml")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return masterAddress
|
||||
}
|
||||
|
||||
func (p *ParamTable) PulsarAddress() string {
|
||||
pulsarAddress, err := p.Load("_PulsarAddress")
|
||||
err = pt.LoadYaml("advanced/proxy.yaml")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return pulsarAddress
|
||||
err = pt.LoadYaml("advanced/channel.yaml")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
proxyIDStr := os.Getenv("PROXY_ID")
|
||||
if proxyIDStr == "" {
|
||||
proxyIDStr = "2"
|
||||
}
|
||||
pt.Save("_proxyID", proxyIDStr)
|
||||
}
|
||||
|
||||
func (pt *ParamTable) MasterAddress() string {
|
||||
ret, err := pt.Load("_MasterAddress")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (pt *ParamTable) PulsarAddress() string {
|
||||
ret, err := pt.Load("_PulsarAddress")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (pt *ParamTable) ProxyNum() int {
|
||||
ret := pt.ProxyIDList()
|
||||
return len(ret)
|
||||
}
|
||||
|
||||
func (pt *ParamTable) ProxyIDList() []UniqueID {
|
||||
proxyIDStr, err := pt.Load("nodeID.proxyIDList")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var ret []UniqueID
|
||||
proxyIDs := strings.Split(proxyIDStr, ",")
|
||||
for _, i := range proxyIDs {
|
||||
v, err := strconv.Atoi(i)
|
||||
if err != nil {
|
||||
log.Panicf("load proxy id list error, %s", err.Error())
|
||||
}
|
||||
ret = append(ret, UniqueID(v))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (pt *ParamTable) ProxyID() UniqueID {
|
||||
proxyID, err := pt.Load("_proxyID")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ID, err := strconv.Atoi(proxyID)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return UniqueID(ID)
|
||||
}
|
||||
|
||||
func (pt *ParamTable) TimeTickInterval() time.Duration {
|
||||
internalStr, err := pt.Load("proxy.timeTickInterval")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
interval, err := strconv.Atoi(internalStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return time.Duration(interval) * time.Millisecond
|
||||
}
|
||||
|
||||
func (pt *ParamTable) convertRangeToSlice(rangeStr, sep string) []int {
|
||||
channelIDs := strings.Split(rangeStr, sep)
|
||||
startStr := channelIDs[0]
|
||||
endStr := channelIDs[1]
|
||||
start, err := strconv.Atoi(startStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
end, err := strconv.Atoi(endStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var ret []int
|
||||
for i := start; i <= end; i++ {
|
||||
ret = append(ret, i)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (pt *ParamTable) sliceIndex() int {
|
||||
proxyID := pt.ProxyID()
|
||||
proxyIDList := pt.ProxyIDList()
|
||||
for i := 0; i < len(proxyIDList); i++ {
|
||||
if proxyID == proxyIDList[i] {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func (pt *ParamTable) InsertChannelNames() []string {
|
||||
prefix, err := pt.Load("msgChannel.chanNamePrefix.insert")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
prefix += "-"
|
||||
iRangeStr, err := pt.Load("msgChannel.channelRange.insert")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
channelIDs := pt.convertRangeToSlice(iRangeStr, ",")
|
||||
var ret []string
|
||||
for _, ID := range channelIDs {
|
||||
ret = append(ret, prefix+strconv.Itoa(ID))
|
||||
}
|
||||
|
||||
proxyNum := pt.ProxyNum()
|
||||
sep := len(channelIDs) / proxyNum
|
||||
index := pt.sliceIndex()
|
||||
if index == -1 {
|
||||
panic("ProxyID not Match with Config")
|
||||
}
|
||||
start := index * sep
|
||||
return ret[start : start+sep]
|
||||
}
|
||||
|
||||
func (pt *ParamTable) DeleteChannelNames() []string {
|
||||
prefix, err := pt.Load("msgChannel.chanNamePrefix.delete")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
prefix += "-"
|
||||
dRangeStr, err := pt.Load("msgChannel.channelRange.delete")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
channelIDs := pt.convertRangeToSlice(dRangeStr, ",")
|
||||
var ret []string
|
||||
for _, ID := range channelIDs {
|
||||
ret = append(ret, prefix+strconv.Itoa(ID))
|
||||
}
|
||||
proxyNum := pt.ProxyNum()
|
||||
sep := len(channelIDs) / proxyNum
|
||||
index := pt.sliceIndex()
|
||||
if index == -1 {
|
||||
panic("ProxyID not Match with Config")
|
||||
}
|
||||
start := index * sep
|
||||
return ret[start : start+sep]
|
||||
}
|
||||
|
||||
func (pt *ParamTable) K2SChannelNames() []string {
|
||||
prefix, err := pt.Load("msgChannel.chanNamePrefix.k2s")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
prefix += "-"
|
||||
k2sRangeStr, err := pt.Load("msgChannel.channelRange.k2s")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
channelIDs := pt.convertRangeToSlice(k2sRangeStr, ",")
|
||||
var ret []string
|
||||
for _, ID := range channelIDs {
|
||||
ret = append(ret, prefix+strconv.Itoa(ID))
|
||||
}
|
||||
proxyNum := pt.ProxyNum()
|
||||
sep := len(channelIDs) / proxyNum
|
||||
index := pt.sliceIndex()
|
||||
if index == -1 {
|
||||
panic("ProxyID not Match with Config")
|
||||
}
|
||||
start := index * sep
|
||||
return ret[start : start+sep]
|
||||
}
|
||||
|
||||
func (pt *ParamTable) SearchChannelNames() []string {
|
||||
prefix, err := pt.Load("msgChannel.chanNamePrefix.search")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
prefix += "-0"
|
||||
return []string{prefix}
|
||||
}
|
||||
|
||||
func (pt *ParamTable) SearchResultChannelNames() []string {
|
||||
prefix, err := pt.Load("msgChannel.chanNamePrefix.searchResult")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
prefix += "-"
|
||||
sRangeStr, err := pt.Load("msgChannel.channelRange.searchResult")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
channelIDs := pt.convertRangeToSlice(sRangeStr, ",")
|
||||
var ret []string
|
||||
for _, ID := range channelIDs {
|
||||
ret = append(ret, prefix+strconv.Itoa(ID))
|
||||
}
|
||||
proxyNum := pt.ProxyNum()
|
||||
sep := len(channelIDs) / proxyNum
|
||||
index := pt.sliceIndex()
|
||||
if index == -1 {
|
||||
panic("ProxyID not Match with Config")
|
||||
}
|
||||
start := index * sep
|
||||
|
||||
return ret[start : start+sep]
|
||||
}
|
||||
|
||||
func (pt *ParamTable) ProxySubName() string {
|
||||
prefix, err := pt.Load("msgChannel.subNamePrefix.proxySubNamePrefix")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
proxyIDStr, err := pt.Load("_proxyID")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return prefix + "-" + proxyIDStr
|
||||
}
|
||||
|
||||
func (pt *ParamTable) ProxyTimeTickChannelNames() []string {
|
||||
prefix, err := pt.Load("msgChannel.chanNamePrefix.proxyTimeTick")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
prefix += "-0"
|
||||
return []string{prefix}
|
||||
}
|
||||
|
||||
func (pt *ParamTable) DataDefinitionChannelNames() []string {
|
||||
prefix, err := pt.Load("msgChannel.chanNamePrefix.dataDefinition")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
prefix += "-0"
|
||||
return []string{prefix}
|
||||
}
|
||||
|
||||
func (pt *ParamTable) parseInt64(key string) int64 {
|
||||
valueStr, err := pt.Load(key)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
value, err := strconv.Atoi(valueStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return int64(value)
|
||||
}
|
||||
|
||||
func (pt *ParamTable) MsgStreamInsertBufSize() int64 {
|
||||
return pt.parseInt64("proxy.msgStream.insert.bufSize")
|
||||
}
|
||||
|
||||
func (pt *ParamTable) MsgStreamSearchBufSize() int64 {
|
||||
return pt.parseInt64("proxy.msgStream.search.bufSize")
|
||||
}
|
||||
|
||||
func (pt *ParamTable) MsgStreamSearchResultBufSize() int64 {
|
||||
return pt.parseInt64("proxy.msgStream.searchResult.recvBufSize")
|
||||
}
|
||||
|
||||
func (pt *ParamTable) MsgStreamSearchResultPulsarBufSize() int64 {
|
||||
return pt.parseInt64("proxy.msgStream.searchResult.pulsarBufSize")
|
||||
}
|
||||
|
||||
func (pt *ParamTable) MsgStreamTimeTickBufSize() int64 {
|
||||
return pt.parseInt64("proxy.msgStream.timeTick.bufSize")
|
||||
}
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParamTable_InsertChannelRange(t *testing.T) {
|
||||
ret := Params.InsertChannelNames()
|
||||
fmt.Println(ret)
|
||||
}
|
||||
|
||||
func TestParamTable_DeleteChannelNames(t *testing.T) {
|
||||
ret := Params.DeleteChannelNames()
|
||||
fmt.Println(ret)
|
||||
}
|
||||
|
||||
func TestParamTable_K2SChannelNames(t *testing.T) {
|
||||
ret := Params.K2SChannelNames()
|
||||
fmt.Println(ret)
|
||||
}
|
||||
|
||||
func TestParamTable_SearchChannelNames(t *testing.T) {
|
||||
ret := Params.SearchChannelNames()
|
||||
fmt.Println(ret)
|
||||
}
|
||||
|
||||
func TestParamTable_SearchResultChannelNames(t *testing.T) {
|
||||
ret := Params.SearchResultChannelNames()
|
||||
fmt.Println(ret)
|
||||
}
|
||||
|
||||
func TestParamTable_ProxySubName(t *testing.T) {
|
||||
ret := Params.ProxySubName()
|
||||
fmt.Println(ret)
|
||||
}
|
||||
|
||||
func TestParamTable_ProxyTimeTickChannelNames(t *testing.T) {
|
||||
ret := Params.ProxyTimeTickChannelNames()
|
||||
fmt.Println(ret)
|
||||
}
|
||||
|
||||
func TestParamTable_DataDefinitionChannelNames(t *testing.T) {
|
||||
ret := Params.DataDefinitionChannelNames()
|
||||
fmt.Println(ret)
|
||||
}
|
||||
|
||||
func TestParamTable_MsgStreamInsertBufSize(t *testing.T) {
|
||||
ret := Params.MsgStreamInsertBufSize()
|
||||
fmt.Println(ret)
|
||||
}
|
||||
|
||||
func TestParamTable_MsgStreamSearchBufSize(t *testing.T) {
|
||||
ret := Params.MsgStreamSearchBufSize()
|
||||
fmt.Println(ret)
|
||||
}
|
||||
|
||||
func TestParamTable_MsgStreamSearchResultBufSize(t *testing.T) {
|
||||
ret := Params.MsgStreamSearchResultBufSize()
|
||||
fmt.Println(ret)
|
||||
}
|
||||
|
||||
func TestParamTable_MsgStreamSearchResultPulsarBufSize(t *testing.T) {
|
||||
ret := Params.MsgStreamSearchResultPulsarBufSize()
|
||||
fmt.Println(ret)
|
||||
}
|
||||
|
||||
func TestParamTable_MsgStreamTimeTickBufSize(t *testing.T) {
|
||||
ret := Params.MsgStreamTimeTickBufSize()
|
||||
fmt.Println(ret)
|
||||
}
|
|
@ -44,7 +44,7 @@ type Proxy struct {
|
|||
}
|
||||
|
||||
func Init() {
|
||||
Params.InitParamTable()
|
||||
Params.Init()
|
||||
}
|
||||
|
||||
func CreateProxy(ctx context.Context) (*Proxy, error) {
|
||||
|
@ -57,17 +57,14 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
|
|||
|
||||
// TODO: use config instead
|
||||
pulsarAddress := Params.PulsarAddress()
|
||||
bufSize := int64(1000)
|
||||
manipulationChannels := []string{"manipulation"}
|
||||
queryChannels := []string{"query"}
|
||||
|
||||
p.manipulationMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, bufSize)
|
||||
p.manipulationMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, Params.MsgStreamInsertBufSize())
|
||||
p.manipulationMsgStream.SetPulsarClient(pulsarAddress)
|
||||
p.manipulationMsgStream.CreatePulsarProducers(manipulationChannels)
|
||||
p.manipulationMsgStream.CreatePulsarProducers(Params.InsertChannelNames())
|
||||
|
||||
p.queryMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, bufSize)
|
||||
p.queryMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, Params.MsgStreamSearchBufSize())
|
||||
p.queryMsgStream.SetPulsarClient(pulsarAddress)
|
||||
p.queryMsgStream.CreatePulsarProducers(queryChannels)
|
||||
p.queryMsgStream.CreatePulsarProducers(Params.SearchChannelNames())
|
||||
|
||||
masterAddr := Params.MasterAddress()
|
||||
idAllocator, err := allocator.NewIDAllocator(p.proxyLoopCtx, masterAddr)
|
||||
|
|
|
@ -363,18 +363,13 @@ func (sched *TaskScheduler) queryResultLoop() {
|
|||
defer sched.wg.Done()
|
||||
|
||||
// TODO: use config instead
|
||||
pulsarAddress := "pulsar://localhost:6650"
|
||||
bufSize := int64(1000)
|
||||
queryResultChannels := []string{"QueryResult"}
|
||||
queryResultSubName := "QueryResultSubject"
|
||||
unmarshal := msgstream.NewUnmarshalDispatcher()
|
||||
|
||||
queryResultMsgStream := msgstream.NewPulsarMsgStream(sched.ctx, bufSize)
|
||||
queryResultMsgStream.SetPulsarClient(pulsarAddress)
|
||||
queryResultMsgStream.CreatePulsarConsumers(queryResultChannels,
|
||||
queryResultSubName,
|
||||
queryResultMsgStream := msgstream.NewPulsarMsgStream(sched.ctx, Params.MsgStreamSearchResultBufSize())
|
||||
queryResultMsgStream.SetPulsarClient(Params.PulsarAddress())
|
||||
queryResultMsgStream.CreatePulsarConsumers(Params.SearchResultChannelNames(),
|
||||
Params.ProxySubName(),
|
||||
unmarshal,
|
||||
bufSize)
|
||||
Params.MsgStreamSearchResultPulsarBufSize())
|
||||
|
||||
queryResultMsgStream.Start()
|
||||
defer queryResultMsgStream.Close()
|
||||
|
|
|
@ -17,7 +17,7 @@ type tickCheckFunc = func(Timestamp) bool
|
|||
type timeTick struct {
|
||||
lastTick Timestamp
|
||||
currentTick Timestamp
|
||||
interval int64
|
||||
interval time.Duration
|
||||
|
||||
pulsarProducer pulsar.Producer
|
||||
|
||||
|
@ -35,24 +35,23 @@ type timeTick struct {
|
|||
|
||||
func newTimeTick(ctx context.Context,
|
||||
tsoAllocator *allocator.TimestampAllocator,
|
||||
interval time.Duration,
|
||||
checkFunc tickCheckFunc) *timeTick {
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
t := &timeTick{
|
||||
ctx: ctx1,
|
||||
cancel: cancel,
|
||||
tsoAllocator: tsoAllocator,
|
||||
interval: 200,
|
||||
interval: interval,
|
||||
peerID: 1,
|
||||
checkFunc: checkFunc,
|
||||
}
|
||||
|
||||
bufSize := int64(1000)
|
||||
t.tickMsgStream = msgstream.NewPulsarMsgStream(t.ctx, bufSize)
|
||||
t.tickMsgStream = msgstream.NewPulsarMsgStream(t.ctx, Params.MsgStreamTimeTickBufSize())
|
||||
pulsarAddress := Params.PulsarAddress()
|
||||
|
||||
producerChannels := []string{"timeTick"}
|
||||
t.tickMsgStream.SetPulsarClient(pulsarAddress)
|
||||
t.tickMsgStream.CreatePulsarProducers(producerChannels)
|
||||
t.tickMsgStream.CreatePulsarProducers(Params.ProxyTimeTickChannelNames())
|
||||
return t
|
||||
}
|
||||
|
||||
|
@ -85,7 +84,7 @@ func (tt *timeTick) tick() error {
|
|||
|
||||
func (tt *timeTick) tickLoop() {
|
||||
defer tt.wg.Done()
|
||||
tt.timer = time.NewTicker(time.Millisecond * time.Duration(tt.interval))
|
||||
tt.timer = time.NewTicker(tt.interval)
|
||||
for {
|
||||
select {
|
||||
case <-tt.timer.C:
|
||||
|
|
|
@ -34,7 +34,7 @@ func TestTimeTick_Start2(t *testing.T) {
|
|||
err = tsoAllocator.Start()
|
||||
assert.Nil(t, err)
|
||||
|
||||
tt := newTimeTick(ctx, tsoAllocator, checkFunc)
|
||||
tt := newTimeTick(ctx, tsoAllocator, Params.TimeTickInterval(), checkFunc)
|
||||
|
||||
defer func() {
|
||||
cancel()
|
||||
|
|
Loading…
Reference in New Issue