mirror of https://github.com/milvus-io/milvus.git
Let MemoryKV.Load return error when key not exist (#15814)
Signed-off-by: yudong.cai <yudong.cai@zilliz.com>pull/15640/head
parent
442e791128
commit
92c8e32ebd
|
@ -80,9 +80,8 @@ func (kv *MemoryKV) Load(key string) (string, error) {
|
|||
kv.RLock()
|
||||
defer kv.RUnlock()
|
||||
item := kv.tree.Get(memoryKVItem{key: key})
|
||||
// TODO,load unexisted key behavior is weird
|
||||
if item == nil {
|
||||
return "", nil
|
||||
return "", fmt.Errorf("invalid key: %s", key)
|
||||
}
|
||||
return item.(memoryKVItem).value.String(), nil
|
||||
}
|
||||
|
@ -92,9 +91,8 @@ func (kv *MemoryKV) LoadBytes(key string) ([]byte, error) {
|
|||
kv.RLock()
|
||||
defer kv.RUnlock()
|
||||
item := kv.tree.Get(memoryKVItem{key: key})
|
||||
// TODO,load unexisted key behavior is weird
|
||||
if item == nil {
|
||||
return []byte{}, nil
|
||||
return []byte{}, fmt.Errorf("invalid key: %s", key)
|
||||
}
|
||||
return item.(memoryKVItem).value.ByteSlice(), nil
|
||||
}
|
||||
|
@ -104,7 +102,6 @@ func (kv *MemoryKV) LoadWithDefault(key, defaultValue string) string {
|
|||
kv.RLock()
|
||||
defer kv.RUnlock()
|
||||
item := kv.tree.Get(memoryKVItem{key: key})
|
||||
|
||||
if item == nil {
|
||||
return defaultValue
|
||||
}
|
||||
|
@ -116,7 +113,6 @@ func (kv *MemoryKV) LoadBytesWithDefault(key string, defaultValue []byte) []byte
|
|||
kv.RLock()
|
||||
defer kv.RUnlock()
|
||||
item := kv.tree.Get(memoryKVItem{key: key})
|
||||
|
||||
if item == nil {
|
||||
return defaultValue
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ func TestMemoryKV_SaveAndLoadBytes(t *testing.T) {
|
|||
|
||||
noKey := "no_key"
|
||||
_value, err = mem.LoadBytes(noKey)
|
||||
assert.NoError(t, err)
|
||||
assert.Error(t, err)
|
||||
assert.Empty(t, _value)
|
||||
}
|
||||
|
||||
|
@ -167,7 +167,7 @@ func TestMemoryKV_MultiSaveBytesAndRemove(t *testing.T) {
|
|||
|
||||
_value, err := mem.LoadBytes(keys[0])
|
||||
assert.Empty(t, _value)
|
||||
assert.NoError(t, err)
|
||||
assert.Error(t, err)
|
||||
|
||||
_values, err := mem.MultiLoadBytes(keys[1:])
|
||||
assert.Equal(t, values[1:], _values)
|
||||
|
@ -255,6 +255,6 @@ func TestMemoryKV_GetSize(t *testing.T) {
|
|||
key2 := "TestMemoryKV_GetSize_key2"
|
||||
|
||||
size, err = memKV.GetSize(key2)
|
||||
assert.NoError(t, err)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, int64(0), size)
|
||||
}
|
||||
|
|
|
@ -406,12 +406,8 @@ func genQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
|
|||
}
|
||||
|
||||
func genLocalChunkManager() (storage.ChunkManager, error) {
|
||||
p, err := Params.Load("storage.path")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p := Params.LoadWithDefault("storage.path", "/tmp/milvus/data")
|
||||
lcm := storage.NewLocalChunkManager(storage.RootPath(p))
|
||||
|
||||
return lcm, nil
|
||||
}
|
||||
|
||||
|
@ -427,10 +423,7 @@ func genRemoteChunkManager(ctx context.Context) (storage.ChunkManager, error) {
|
|||
}
|
||||
|
||||
func genVectorChunkManager(ctx context.Context) (*storage.VectorChunkManager, error) {
|
||||
p, err := Params.Load("storage.path")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p := Params.LoadWithDefault("storage.path", "/tmp/milvus/data")
|
||||
lcm := storage.NewLocalChunkManager(storage.RootPath(p))
|
||||
|
||||
rcm, err := storage.NewMinioChunkManager(
|
||||
|
|
|
@ -94,7 +94,7 @@ func TestGetBinlogSize(t *testing.T) {
|
|||
|
||||
// key not in memoryKV
|
||||
size, err = GetBinlogSize(memoryKV, key)
|
||||
assert.NoError(t, err)
|
||||
assert.Error(t, err)
|
||||
assert.Zero(t, size)
|
||||
|
||||
// normal binlog key, for example, index binlog
|
||||
|
|
|
@ -34,7 +34,20 @@ import (
|
|||
// UniqueID is type alias of typeutil.UniqueID
|
||||
type UniqueID = typeutil.UniqueID
|
||||
|
||||
const envPrefix string = "milvus"
|
||||
const (
|
||||
DefaultMinioHost = "localhost"
|
||||
DefaultMinioPort = "9000"
|
||||
DefaultMinioAccessKey = "minioadmin"
|
||||
DefaultMinioSecretAccessKey = "minioadmin"
|
||||
DefaultMinioUseSSL = "false"
|
||||
DefaultMinioBucketName = "a-bucket"
|
||||
DefaultPulsarHost = "localhost"
|
||||
DefaultPulsarPort = "6650"
|
||||
DefaultEtcdEndpoints = "localhost:2379"
|
||||
DefaultRocksmqPath = "/var/lib/milvus/rdb_data"
|
||||
DefaultInsertBufferSize = "16777216"
|
||||
DefaultEnvPrefix = "milvus"
|
||||
)
|
||||
|
||||
// Base abstracts BaseTable
|
||||
// TODO: it's never used, consider to substitute BaseTable or to remove it
|
||||
|
@ -112,100 +125,72 @@ func (gp *BaseTable) loadFromMilvusYaml() {
|
|||
}
|
||||
|
||||
func (gp *BaseTable) tryloadFromEnv() {
|
||||
var err error
|
||||
// minio
|
||||
minioAddress := os.Getenv("MINIO_ADDRESS")
|
||||
if minioAddress == "" {
|
||||
minioHost, err := gp.Load("minio.address")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
port, err := gp.Load("minio.port")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
minioHost := gp.LoadWithDefault("minio.address", DefaultMinioHost)
|
||||
port := gp.LoadWithDefault("minio.port", DefaultMinioPort)
|
||||
minioAddress = minioHost + ":" + port
|
||||
}
|
||||
gp.Save("_MinioAddress", minioAddress)
|
||||
|
||||
// etcd
|
||||
etcdEndpoints := os.Getenv("ETCD_ENDPOINTS")
|
||||
if etcdEndpoints == "" {
|
||||
etcdEndpoints, err = gp.Load("etcd.endpoints")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
etcdEndpoints = gp.LoadWithDefault("etcd.endpoints", DefaultEtcdEndpoints)
|
||||
}
|
||||
gp.Save("_EtcdEndpoints", etcdEndpoints)
|
||||
|
||||
// pulsar
|
||||
pulsarAddress := os.Getenv("PULSAR_ADDRESS")
|
||||
if pulsarAddress == "" {
|
||||
pulsarHost, err := gp.Load("pulsar.address")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
port, err := gp.Load("pulsar.port")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pulsarHost := gp.LoadWithDefault("pulsar.address", DefaultPulsarHost)
|
||||
port := gp.LoadWithDefault("pulsar.port", DefaultPulsarPort)
|
||||
pulsarAddress = "pulsar://" + pulsarHost + ":" + port
|
||||
}
|
||||
gp.Save("_PulsarAddress", pulsarAddress)
|
||||
|
||||
// rocksmq
|
||||
rocksmqPath := os.Getenv("ROCKSMQ_PATH")
|
||||
if rocksmqPath == "" {
|
||||
path, err := gp.Load("rocksmq.path")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rocksmqPath = path
|
||||
rocksmqPath = gp.LoadWithDefault("rocksmq.path", DefaultRocksmqPath)
|
||||
}
|
||||
gp.Save("_RocksmqPath", rocksmqPath)
|
||||
|
||||
insertBufferFlushSize := os.Getenv("DATA_NODE_IBUFSIZE")
|
||||
if insertBufferFlushSize == "" {
|
||||
insertBufferFlushSize = gp.LoadWithDefault("datanode.flush.insertBufSize", "16777216")
|
||||
insertBufferFlushSize = gp.LoadWithDefault("datanode.flush.insertBufSize", DefaultInsertBufferSize)
|
||||
}
|
||||
gp.Save("_DATANODE_INSERTBUFSIZE", insertBufferFlushSize)
|
||||
|
||||
minioAccessKey := os.Getenv("MINIO_ACCESS_KEY")
|
||||
if minioAccessKey == "" {
|
||||
minioAccessKey, err = gp.Load("minio.accessKeyID")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
minioAccessKey = gp.LoadWithDefault("minio.accessKeyID", DefaultMinioAccessKey)
|
||||
}
|
||||
gp.Save("_MinioAccessKeyID", minioAccessKey)
|
||||
|
||||
minioSecretKey := os.Getenv("MINIO_SECRET_KEY")
|
||||
if minioSecretKey == "" {
|
||||
minioSecretKey, err = gp.Load("minio.secretAccessKey")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
minioSecretKey = gp.LoadWithDefault("minio.secretAccessKey", DefaultMinioSecretAccessKey)
|
||||
}
|
||||
gp.Save("_MinioSecretAccessKey", minioSecretKey)
|
||||
|
||||
minioUseSSL := os.Getenv("MINIO_USE_SSL")
|
||||
if minioUseSSL == "" {
|
||||
minioUseSSL, err = gp.Load("minio.useSSL")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
minioUseSSL = gp.LoadWithDefault("minio.useSSL", DefaultMinioUseSSL)
|
||||
}
|
||||
gp.Save("_MinioUseSSL", minioUseSSL)
|
||||
|
||||
minioBucketName := os.Getenv("MINIO_BUCKET_NAME")
|
||||
if minioBucketName == "" {
|
||||
minioBucketName, err = gp.Load("minio.bucketName")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
minioBucketName = gp.LoadWithDefault("minio.bucketName", DefaultMinioBucketName)
|
||||
}
|
||||
gp.Save("_MinioBucketName", minioBucketName)
|
||||
|
||||
// try to load environment start with ENV_PREFIX
|
||||
for _, e := range os.Environ() {
|
||||
parts := strings.SplitN(e, "=", 2)
|
||||
if strings.Contains(parts[0], envPrefix) {
|
||||
if strings.Contains(parts[0], DefaultEnvPrefix) {
|
||||
parts := strings.SplitN(e, "=", 2)
|
||||
// remove the ENV PREFIX and use the rest as key
|
||||
keyParts := strings.SplitAfterN(parts[0], ".", 2)
|
||||
|
|
|
@ -122,7 +122,7 @@ func TestBaseTable_LoadYaml(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
assert.Panics(t, func() { baseParams.LoadYaml("advanced/not_exist.yaml") })
|
||||
|
||||
_, err = baseParams.Load("etcd.address")
|
||||
_, err = baseParams.Load("etcd.endpoints")
|
||||
assert.Nil(t, err)
|
||||
_, err = baseParams.Load("pulsar.port")
|
||||
assert.Nil(t, err)
|
||||
|
|
Loading…
Reference in New Issue