mirror of https://github.com/milvus-io/milvus.git
Move channel configuration from channel.yaml to milvus.yaml (#8420)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/8617/head
parent
174c6275ca
commit
31ec399edb
|
@ -132,3 +132,34 @@ log:
|
|||
maxAge: 10 # day
|
||||
maxBackups: 20
|
||||
format: text # text/json
|
||||
|
||||
msgChannel:
|
||||
# channel name generation rule: ${namePrefix}-${ChannelIdx}
|
||||
chanNamePrefix:
|
||||
cluster: "by-dev"
|
||||
rootCoordTimeTick: "rootcoord-timetick"
|
||||
rootCoordStatistics: "rootcoord-statistics"
|
||||
rootCoordDml: "rootcoord-dml"
|
||||
search: "search"
|
||||
searchResult: "searchResult"
|
||||
proxyTimeTick: "proxyTimeTick"
|
||||
queryTimeTick: "queryTimeTick"
|
||||
queryNodeStats: "query-node-stats"
|
||||
# cmd for loadIndex, flush, etc...
|
||||
cmd: "cmd"
|
||||
dataCoordInsertChannel: "insert-channel-"
|
||||
dataCoordStatistic: "datacoord-statistics-channel"
|
||||
dataCoordTimeTick: "datacoord-timetick-channel"
|
||||
dataCoordSegmentInfo: "segment-info-channel"
|
||||
|
||||
# sub name generation rule: ${subNamePrefix}-${NodeID}
|
||||
subNamePrefix:
|
||||
rootCoordSubNamePrefix: "rootCoord"
|
||||
proxySubNamePrefix: "proxy"
|
||||
queryNodeSubNamePrefix: "queryNode"
|
||||
dataNodeSubNamePrefix: "dataNode"
|
||||
dataCoordSubNamePrefix: "dataCoord"
|
||||
|
||||
common:
|
||||
defaultPartitionName: "_default" # default partition name for a collection
|
||||
defaultIndexName: "_default_idx" # default index name
|
|
@ -17,6 +17,7 @@ import (
|
|||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
|
@ -47,28 +48,15 @@ type BaseTable struct {
|
|||
func (gp *BaseTable) Init() {
|
||||
gp.params = memkv.NewMemoryKV()
|
||||
|
||||
_, fpath, _, _ := runtime.Caller(0)
|
||||
configDir := path.Dir(fpath) + "/../../../configs/"
|
||||
if _, err := os.Stat(configDir); err != nil {
|
||||
log.Warn("cannot access config directory", zap.String("configDir", configDir), zap.Error(err))
|
||||
if runPath, err1 := os.Getwd(); err1 != nil {
|
||||
panic(err1.Error())
|
||||
} else {
|
||||
configDir = runPath + "/configs/"
|
||||
}
|
||||
}
|
||||
gp.configDir = configDir
|
||||
gp.configDir = gp.initConfPath()
|
||||
log.Debug("config directory", zap.String("configDir", gp.configDir))
|
||||
|
||||
if err := gp.LoadYaml("milvus.yaml"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := gp.LoadYaml("advanced/common.yaml"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := gp.LoadYaml("advanced/channel.yaml"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
gp.loadFromMilvusYaml()
|
||||
|
||||
// TODO remove once we change helm deployment
|
||||
gp.loadFromCommonYaml()
|
||||
gp.loadFromChannelYaml()
|
||||
|
||||
gp.tryloadFromEnv()
|
||||
}
|
||||
|
||||
|
@ -86,6 +74,54 @@ func (gp *BaseTable) LoadFromKVPair(kvPairs []*commonpb.KeyValuePair) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (gp *BaseTable) initConfPath() string {
|
||||
// check if user set conf dir through env
|
||||
configDir, find := syscall.Getenv("MILVUSCONF")
|
||||
if !find {
|
||||
runPath, err := os.Getwd()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
configDir = runPath + "/configs/"
|
||||
if _, err := os.Stat(configDir); err != nil {
|
||||
_, fpath, _, _ := runtime.Caller(0)
|
||||
// TODO, this is a hack, need to find better solution for relative path
|
||||
configDir = path.Dir(fpath) + "/../../../configs/"
|
||||
}
|
||||
}
|
||||
return configDir
|
||||
}
|
||||
|
||||
func (gp *BaseTable) loadFromMilvusYaml() {
|
||||
if err := gp.LoadYaml("milvus.yaml"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (gp *BaseTable) loadFromCommonYaml() bool {
|
||||
configFile := gp.configDir + "advanced/common.yaml"
|
||||
if _, err := os.Stat(configFile); err == nil {
|
||||
if err := gp.LoadYaml("advanced/common.yaml"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return true
|
||||
}
|
||||
log.Debug("failed to find common.yaml in config, skip..")
|
||||
return false
|
||||
}
|
||||
|
||||
func (gp *BaseTable) loadFromChannelYaml() bool {
|
||||
configFile := gp.configDir + "advanced/channel.yaml"
|
||||
if _, err := os.Stat(configFile); err == nil {
|
||||
if err := gp.LoadYaml("advanced/channel.yaml"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return true
|
||||
}
|
||||
log.Debug("failed to find channel.yaml in config, skip..")
|
||||
return false
|
||||
}
|
||||
|
||||
func (gp *BaseTable) tryloadFromEnv() {
|
||||
minioAddress := os.Getenv("MINIO_ADDRESS")
|
||||
if minioAddress == "" {
|
||||
|
|
|
@ -15,6 +15,8 @@ import (
|
|||
"os"
|
||||
"testing"
|
||||
|
||||
"path/filepath"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
@ -124,8 +126,6 @@ func TestBaseTable_Remove(t *testing.T) {
|
|||
func TestBaseTable_LoadYaml(t *testing.T) {
|
||||
err := baseParams.LoadYaml("milvus.yaml")
|
||||
assert.Nil(t, err)
|
||||
err = baseParams.LoadYaml("advanced/channel.yaml")
|
||||
assert.Nil(t, err)
|
||||
assert.Panics(t, func() { baseParams.LoadYaml("advanced/not_exist.yaml") })
|
||||
|
||||
_, err = baseParams.Load("etcd.address")
|
||||
|
@ -134,6 +134,37 @@ func TestBaseTable_LoadYaml(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestBaseTable_ConfDir(t *testing.T) {
|
||||
rightConfig := baseParams.configDir
|
||||
// fake dir
|
||||
baseParams.configDir = "./"
|
||||
|
||||
assert.Panics(t, func() { baseParams.loadFromMilvusYaml() })
|
||||
assert.False(t, baseParams.loadFromCommonYaml())
|
||||
assert.False(t, baseParams.loadFromChannelYaml())
|
||||
|
||||
baseParams.configDir = rightConfig
|
||||
assert.True(t, baseParams.loadFromCommonYaml())
|
||||
assert.True(t, baseParams.loadFromChannelYaml())
|
||||
}
|
||||
|
||||
func TestBateTable_ConfPath(t *testing.T) {
|
||||
os.Setenv("MILVUSCONF", "test")
|
||||
config := baseParams.initConfPath()
|
||||
assert.Equal(t, config, "test")
|
||||
|
||||
os.Unsetenv("MILVUSCONF")
|
||||
dir, _ := os.Getwd()
|
||||
config = baseParams.initConfPath()
|
||||
assert.Equal(t, filepath.Clean(config), filepath.Clean(dir+"/../../../configs/"))
|
||||
|
||||
// test use get dir
|
||||
os.Chdir(dir + "/../../../")
|
||||
defer os.Chdir(dir)
|
||||
config = baseParams.initConfPath()
|
||||
assert.Equal(t, filepath.Clean(config), filepath.Clean(dir+"/../../../configs/"))
|
||||
}
|
||||
|
||||
func TestBaseTable_Parse(t *testing.T) {
|
||||
t.Run("ParseBool", func(t *testing.T) {
|
||||
assert.Nil(t, baseParams.Save("key", "true"))
|
||||
|
|
Loading…
Reference in New Issue