mirror of https://github.com/milvus-io/milvus.git
Cherry-pick from master pr: #38924 Related to #38923 This PR: - Check whether `os.Stat` config file error is io.ErrNotExist - Panic when get config return error during Milvus initialization --------- --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/39193/head
parent
782cd749bb
commit
29a0e31c2f
|
@ -17,9 +17,11 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
@ -38,7 +40,10 @@ func Init(opts ...Option) (*Manager, error) {
|
|||
sourceManager := NewManager()
|
||||
if o.FileInfo != nil {
|
||||
s := NewFileSource(o.FileInfo)
|
||||
sourceManager.AddSource(s)
|
||||
err := sourceManager.AddSource(s)
|
||||
if err != nil {
|
||||
log.Fatal("failed to add FileSource config", zap.Error(err))
|
||||
}
|
||||
}
|
||||
if o.EnvKeyFormatter != nil {
|
||||
sourceManager.AddSource(NewEnvSource(o.EnvKeyFormatter))
|
||||
|
|
|
@ -123,9 +123,14 @@ func (fs *FileSource) loadFromFile() error {
|
|||
configFiles = fs.files
|
||||
fs.RUnlock()
|
||||
|
||||
notExistsNum := 0
|
||||
for _, configFile := range configFiles {
|
||||
if _, err := os.Stat(configFile); err != nil {
|
||||
continue
|
||||
if os.IsNotExist(err) {
|
||||
notExistsNum++
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
yamlReader.SetConfigFile(configFile)
|
||||
|
@ -161,6 +166,10 @@ func (fs *FileSource) loadFromFile() error {
|
|||
newConfig[formatKey(key)] = str
|
||||
}
|
||||
}
|
||||
// not allow all config files missing, return error for this case
|
||||
if notExistsNum == len(configFiles) {
|
||||
return errors.Newf("all config files not exists, files: %v", configFiles)
|
||||
}
|
||||
|
||||
return fs.update(newConfig)
|
||||
}
|
||||
|
|
|
@ -136,6 +136,7 @@ func TestOnEvent(t *testing.T) {
|
|||
|
||||
dir, _ := os.MkdirTemp("", "milvus")
|
||||
yamlFile := path.Join(dir, "milvus.yaml")
|
||||
os.WriteFile(yamlFile, []byte("a.b: \"\""), 0o600)
|
||||
mgr, _ := Init(WithEnvSource(formatKey),
|
||||
WithFilesSource(&FileInfo{
|
||||
Files: []string{yamlFile},
|
||||
|
@ -147,31 +148,41 @@ func TestOnEvent(t *testing.T) {
|
|||
RefreshInterval: 10 * time.Millisecond,
|
||||
}))
|
||||
os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600)
|
||||
time.Sleep(time.Second)
|
||||
value, err := mgr.GetConfig("a.b")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, value, "aaa")
|
||||
assert.Eventually(t, func() bool {
|
||||
value, err := mgr.GetConfig("a.b")
|
||||
assert.NoError(t, err)
|
||||
return value == "aaa"
|
||||
}, time.Second*5, time.Second)
|
||||
|
||||
ctx := context.Background()
|
||||
client.KV.Put(ctx, "test/config/a/b", "bbb")
|
||||
time.Sleep(time.Second)
|
||||
value, err = mgr.GetConfig("a.b")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, value, "bbb")
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
value, err := mgr.GetConfig("a.b")
|
||||
assert.NoError(t, err)
|
||||
return value == "bbb"
|
||||
}, time.Second*5, time.Second)
|
||||
|
||||
client.KV.Put(ctx, "test/config/a/b", "ccc")
|
||||
time.Sleep(time.Second)
|
||||
value, err = mgr.GetConfig("a.b")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, value, "ccc")
|
||||
assert.Eventually(t, func() bool {
|
||||
value, err := mgr.GetConfig("a.b")
|
||||
assert.NoError(t, err)
|
||||
return value == "ccc"
|
||||
}, time.Second*5, time.Second)
|
||||
|
||||
os.WriteFile(yamlFile, []byte("a.b: ddd"), 0o600)
|
||||
time.Sleep(time.Second)
|
||||
value, err = mgr.GetConfig("a.b")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, value, "ccc")
|
||||
assert.Eventually(t, func() bool {
|
||||
value, err := mgr.GetConfig("a.b")
|
||||
assert.NoError(t, err)
|
||||
return value == "ccc"
|
||||
}, time.Second*5, time.Second)
|
||||
|
||||
client.KV.Delete(ctx, "test/config/a/b")
|
||||
time.Sleep(time.Second)
|
||||
value, err = mgr.GetConfig("a.b")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, value, "ddd")
|
||||
assert.Eventually(t, func() bool {
|
||||
value, err := mgr.GetConfig("a.b")
|
||||
assert.NoError(t, err)
|
||||
return value == "ddd"
|
||||
}, time.Second*5, time.Second)
|
||||
}
|
||||
|
||||
func TestDeadlock(t *testing.T) {
|
||||
|
@ -206,6 +217,7 @@ func TestCachedConfig(t *testing.T) {
|
|||
|
||||
dir, _ := os.MkdirTemp("", "milvus")
|
||||
yamlFile := path.Join(dir, "milvus.yaml")
|
||||
os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600)
|
||||
mgr, _ := Init(WithEnvSource(formatKey),
|
||||
WithFilesSource(&FileInfo{
|
||||
Files: []string{yamlFile},
|
||||
|
@ -218,7 +230,6 @@ func TestCachedConfig(t *testing.T) {
|
|||
}))
|
||||
// test get cached value from file
|
||||
{
|
||||
os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600)
|
||||
time.Sleep(time.Second)
|
||||
_, exist := mgr.GetCachedValue("a.b")
|
||||
assert.False(t, exist)
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -70,8 +71,7 @@ func (r *refresher) refreshPeriodically(name string) {
|
|||
case <-ticker.C:
|
||||
err := r.fetchFunc()
|
||||
if err != nil {
|
||||
log.Error("can not pull configs", zap.Error(err))
|
||||
r.stop()
|
||||
log.Ctx(context.Background()).WithRateGroup("refresher", 1, 60).RatedWarn(60, "can not pull configs", zap.Error(err))
|
||||
}
|
||||
case <-r.intervalDone:
|
||||
log.Info("stop refreshing configurations", zap.String("source", name))
|
||||
|
|
Loading…
Reference in New Issue