mirror of https://github.com/milvus-io/milvus.git
Fix golint warning of funcutil (#8652)
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/8723/merge
parent
b70665ed89
commit
6065bb50c4
|
@ -18,6 +18,8 @@ import (
|
|||
"os"
|
||||
"strconv"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/trace"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
|
@ -1773,7 +1775,7 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR
|
|||
}
|
||||
|
||||
func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) {
|
||||
param, _ := GetAttrByKeyFromRepeatedKV("metric", request.GetParams())
|
||||
param, _ := funcutil.GetAttrByKeyFromRepeatedKV("metric", request.GetParams())
|
||||
metric, err := distance.ValidateMetricType(param)
|
||||
if err != nil {
|
||||
return &milvuspb.CalcDistanceResults{
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
PulsarMaxMessageSizeKey = "maxMessageSize"
|
||||
SuggestPulsarMaxMessageSizeKey = 5 * 1024 * 1024
|
||||
)
|
||||
|
||||
|
|
|
@ -1505,12 +1505,12 @@ func (st *searchTask) PreExecute(ctx context.Context) error {
|
|||
st.query.OutputFields = outputFields
|
||||
|
||||
if st.query.GetDslType() == commonpb.DslType_BoolExprV1 {
|
||||
annsField, err := GetAttrByKeyFromRepeatedKV(AnnsFieldKey, st.query.SearchParams)
|
||||
annsField, err := funcutil.GetAttrByKeyFromRepeatedKV(AnnsFieldKey, st.query.SearchParams)
|
||||
if err != nil {
|
||||
return errors.New(AnnsFieldKey + " not found in search_params")
|
||||
}
|
||||
|
||||
topKStr, err := GetAttrByKeyFromRepeatedKV(TopKKey, st.query.SearchParams)
|
||||
topKStr, err := funcutil.GetAttrByKeyFromRepeatedKV(TopKKey, st.query.SearchParams)
|
||||
if err != nil {
|
||||
return errors.New(TopKKey + " not found in search_params")
|
||||
}
|
||||
|
@ -1519,12 +1519,12 @@ func (st *searchTask) PreExecute(ctx context.Context) error {
|
|||
return errors.New(TopKKey + " " + topKStr + " is not invalid")
|
||||
}
|
||||
|
||||
metricType, err := GetAttrByKeyFromRepeatedKV(MetricTypeKey, st.query.SearchParams)
|
||||
metricType, err := funcutil.GetAttrByKeyFromRepeatedKV(MetricTypeKey, st.query.SearchParams)
|
||||
if err != nil {
|
||||
return errors.New(MetricTypeKey + " not found in search_params")
|
||||
}
|
||||
|
||||
searchParams, err := GetAttrByKeyFromRepeatedKV(SearchParamsKey, st.query.SearchParams)
|
||||
searchParams, err := funcutil.GetAttrByKeyFromRepeatedKV(SearchParamsKey, st.query.SearchParams)
|
||||
if err != nil {
|
||||
return errors.New(SearchParamsKey + " not found in search_params")
|
||||
}
|
||||
|
|
|
@ -1,93 +0,0 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
)
|
||||
|
||||
func GetPulsarConfig(protocol, ip, port, url string, args ...int64) (map[string]interface{}, error) {
|
||||
var resp *http.Response
|
||||
var err error
|
||||
|
||||
getResp := func() error {
|
||||
log.Debug("proxy util", zap.String("url", protocol+"://"+ip+":"+port+url))
|
||||
resp, err = http.Get(protocol + "://" + ip + ":" + port + url)
|
||||
return err
|
||||
}
|
||||
|
||||
var attempt uint = 10
|
||||
var interval time.Duration = time.Second
|
||||
if len(args) > 0 && args[0] > 0 {
|
||||
attempt = uint(args[0])
|
||||
}
|
||||
if len(args) > 1 && args[1] > 0 {
|
||||
interval = time.Duration(args[1])
|
||||
}
|
||||
|
||||
err = retry.Do(context.TODO(), getResp, retry.Attempts(attempt), retry.Sleep(interval))
|
||||
if err != nil {
|
||||
log.Debug("failed to get config", zap.String("error", err.Error()))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
log.Debug("get config", zap.String("config", string(body)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ret := make(map[string]interface{})
|
||||
err = json.Unmarshal(body, &ret)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func getMax(a, b int) int {
|
||||
if a > b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func getMin(a, b int) int {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func GetAttrByKeyFromRepeatedKV(key string, kvs []*commonpb.KeyValuePair) (string, error) {
|
||||
for _, kv := range kvs {
|
||||
if kv.Key == key {
|
||||
return kv.Value, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", errors.New("key " + key + " not found")
|
||||
}
|
|
@ -1,107 +0,0 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/jarcoal/httpmock"
|
||||
)
|
||||
|
||||
func TestGetPulsarConfig(t *testing.T) {
|
||||
httpmock.Activate()
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
runtimeConfig := make(map[string]interface{})
|
||||
runtimeConfig[PulsarMaxMessageSizeKey] = strconv.FormatInt(5*1024*1024, 10)
|
||||
|
||||
protocol := "http"
|
||||
ip := "pulsar"
|
||||
port := "18080"
|
||||
url := "/admin/v2/brokers/configuration/runtime"
|
||||
httpmock.RegisterResponder("GET", protocol+"://"+ip+":"+port+url,
|
||||
func(req *http.Request) (*http.Response, error) {
|
||||
return httpmock.NewJsonResponse(200, runtimeConfig)
|
||||
},
|
||||
)
|
||||
|
||||
ret, err := GetPulsarConfig(protocol, ip, port, url)
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, len(ret), len(runtimeConfig))
|
||||
assert.Equal(t, len(ret), 1)
|
||||
for key, value := range ret {
|
||||
assert.Equal(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", runtimeConfig[key]))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAttrByKeyFromRepeatedKV(t *testing.T) {
|
||||
kvs := []*commonpb.KeyValuePair{
|
||||
{Key: "Key1", Value: "Value1"},
|
||||
{Key: "Key2", Value: "Value2"},
|
||||
{Key: "Key3", Value: "Value3"},
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
key string
|
||||
kvs []*commonpb.KeyValuePair
|
||||
value string
|
||||
errIsNil bool
|
||||
}{
|
||||
{"Key1", kvs, "Value1", true},
|
||||
{"Key2", kvs, "Value2", true},
|
||||
{"Key3", kvs, "Value3", true},
|
||||
{"other", kvs, "", false},
|
||||
}
|
||||
|
||||
for _, test := range cases {
|
||||
value, err := GetAttrByKeyFromRepeatedKV(test.key, test.kvs)
|
||||
assert.Equal(t, test.value, value)
|
||||
assert.Equal(t, test.errIsNil, err == nil)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUtil(t *testing.T) {
|
||||
a, b := 1, 2
|
||||
t.Run("getMax", func(t *testing.T) {
|
||||
ans := getMax(a, b)
|
||||
assert.Equal(t, b, ans)
|
||||
|
||||
ans = getMax(b, a)
|
||||
assert.Equal(t, b, ans)
|
||||
})
|
||||
|
||||
t.Run("getMin", func(t *testing.T) {
|
||||
ans := getMin(a, b)
|
||||
assert.Equal(t, a, ans)
|
||||
|
||||
ans = getMin(b, a)
|
||||
assert.Equal(t, a, ans)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetPulsarConfig_Error(t *testing.T) {
|
||||
protocol := "http"
|
||||
ip := "pulsar"
|
||||
port := "17777"
|
||||
url := "/admin/v2/brokers/configuration/runtime"
|
||||
|
||||
ret, err := GetPulsarConfig(protocol, ip, port, url, 1, 1)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, ret)
|
||||
}
|
|
@ -16,10 +16,15 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/go-basic/ipv4"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
|
@ -28,6 +33,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
)
|
||||
|
||||
// CheckGrpcReady wait for context timeout, or wait 100ms then send nil to targetCh
|
||||
func CheckGrpcReady(ctx context.Context, targetCh chan error) {
|
||||
select {
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
|
@ -37,6 +43,7 @@ func CheckGrpcReady(ctx context.Context, targetCh chan error) {
|
|||
}
|
||||
}
|
||||
|
||||
// CheckPortAvailable check if a port is available to be listened on
|
||||
func CheckPortAvailable(port int) bool {
|
||||
addr := ":" + strconv.Itoa(port)
|
||||
listener, err := net.Listen("tcp", addr)
|
||||
|
@ -46,6 +53,7 @@ func CheckPortAvailable(port int) bool {
|
|||
return err == nil
|
||||
}
|
||||
|
||||
// GetAvailablePort return an available port that can be listened on
|
||||
func GetAvailablePort() int {
|
||||
listener, err := net.Listen("tcp", ":0")
|
||||
if err != nil {
|
||||
|
@ -56,10 +64,12 @@ func GetAvailablePort() int {
|
|||
return listener.Addr().(*net.TCPAddr).Port
|
||||
}
|
||||
|
||||
// GetLocalIP return the local ip address
|
||||
func GetLocalIP() string {
|
||||
return ipv4.LocalIP()
|
||||
}
|
||||
|
||||
// WaitForComponentStates wait for component's state to be one of the specific states
|
||||
func WaitForComponentStates(ctx context.Context, service types.Component, serviceName string, states []internalpb.StateCode, attempts uint, sleep time.Duration) error {
|
||||
checkFunc := func() error {
|
||||
resp, err := service.GetComponentStates(ctx)
|
||||
|
@ -87,18 +97,22 @@ func WaitForComponentStates(ctx context.Context, service types.Component, servic
|
|||
return retry.Do(ctx, checkFunc, retry.Attempts(attempts), retry.Sleep(sleep))
|
||||
}
|
||||
|
||||
// WaitForComponentInitOrHealthy wait for component's state to be initializing or healthy
|
||||
func WaitForComponentInitOrHealthy(ctx context.Context, service types.Component, serviceName string, attempts uint, sleep time.Duration) error {
|
||||
return WaitForComponentStates(ctx, service, serviceName, []internalpb.StateCode{internalpb.StateCode_Initializing, internalpb.StateCode_Healthy}, attempts, sleep)
|
||||
}
|
||||
|
||||
// WaitForComponentInitOrHealthy wait for component's state to be initializing
|
||||
func WaitForComponentInit(ctx context.Context, service types.Component, serviceName string, attempts uint, sleep time.Duration) error {
|
||||
return WaitForComponentStates(ctx, service, serviceName, []internalpb.StateCode{internalpb.StateCode_Initializing}, attempts, sleep)
|
||||
}
|
||||
|
||||
// WaitForComponentInitOrHealthy wait for component's state to be healthy
|
||||
func WaitForComponentHealthy(ctx context.Context, service types.Component, serviceName string, attempts uint, sleep time.Duration) error {
|
||||
return WaitForComponentStates(ctx, service, serviceName, []internalpb.StateCode{internalpb.StateCode_Healthy}, attempts, sleep)
|
||||
}
|
||||
|
||||
// ParseIndexParamsMap parse the jsonic index parameters to map
|
||||
func ParseIndexParamsMap(mStr string) (map[string]string, error) {
|
||||
buffer := make(map[string]interface{})
|
||||
err := json.Unmarshal([]byte(mStr), &buffer)
|
||||
|
@ -112,3 +126,61 @@ func ParseIndexParamsMap(mStr string) (map[string]string, error) {
|
|||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
const (
|
||||
PulsarMaxMessageSizeKey = "maxMessageSize"
|
||||
)
|
||||
|
||||
// GetPulsarConfig get pulsar configuration using pulsar admin api
|
||||
func GetPulsarConfig(protocol, ip, port, url string, args ...int64) (map[string]interface{}, error) {
|
||||
var resp *http.Response
|
||||
var err error
|
||||
|
||||
getResp := func() error {
|
||||
log.Debug("function util", zap.String("url", protocol+"://"+ip+":"+port+url))
|
||||
resp, err = http.Get(protocol + "://" + ip + ":" + port + url)
|
||||
return err
|
||||
}
|
||||
|
||||
var attempt uint = 10
|
||||
var interval time.Duration = time.Second
|
||||
if len(args) > 0 && args[0] > 0 {
|
||||
attempt = uint(args[0])
|
||||
}
|
||||
if len(args) > 1 && args[1] > 0 {
|
||||
interval = time.Duration(args[1])
|
||||
}
|
||||
|
||||
err = retry.Do(context.TODO(), getResp, retry.Attempts(attempt), retry.Sleep(interval))
|
||||
if err != nil {
|
||||
log.Debug("failed to get config", zap.String("error", err.Error()))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
log.Debug("get config", zap.String("config", string(body)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ret := make(map[string]interface{})
|
||||
err = json.Unmarshal(body, &ret)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// GetAttrByKeyFromRepeatedKV return the value corresponding to key in kv pair
|
||||
func GetAttrByKeyFromRepeatedKV(key string, kvs []*commonpb.KeyValuePair) (string, error) {
|
||||
for _, kv := range kvs {
|
||||
if kv.Key == key {
|
||||
return kv.Value, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", errors.New("key " + key + " not found")
|
||||
}
|
||||
|
|
|
@ -15,10 +15,13 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jarcoal/httpmock"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
|
@ -197,3 +200,66 @@ func Test_ParseIndexParamsMap(t *testing.T) {
|
|||
_, err = ParseIndexParamsMap(invalidStr)
|
||||
assert.NotEqual(t, err, nil)
|
||||
}
|
||||
|
||||
func TestGetPulsarConfig(t *testing.T) {
|
||||
httpmock.Activate()
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
runtimeConfig := make(map[string]interface{})
|
||||
runtimeConfig[PulsarMaxMessageSizeKey] = strconv.FormatInt(5*1024*1024, 10)
|
||||
|
||||
protocol := "http"
|
||||
ip := "pulsar"
|
||||
port := "18080"
|
||||
url := "/admin/v2/brokers/configuration/runtime"
|
||||
httpmock.RegisterResponder("GET", protocol+"://"+ip+":"+port+url,
|
||||
func(req *http.Request) (*http.Response, error) {
|
||||
return httpmock.NewJsonResponse(200, runtimeConfig)
|
||||
},
|
||||
)
|
||||
|
||||
ret, err := GetPulsarConfig(protocol, ip, port, url)
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, len(ret), len(runtimeConfig))
|
||||
assert.Equal(t, len(ret), 1)
|
||||
for key, value := range ret {
|
||||
assert.Equal(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", runtimeConfig[key]))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPulsarConfig_Error(t *testing.T) {
|
||||
protocol := "http"
|
||||
ip := "pulsar"
|
||||
port := "17777"
|
||||
url := "/admin/v2/brokers/configuration/runtime"
|
||||
|
||||
ret, err := GetPulsarConfig(protocol, ip, port, url, 1, 1)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, ret)
|
||||
}
|
||||
|
||||
func TestGetAttrByKeyFromRepeatedKV(t *testing.T) {
|
||||
kvs := []*commonpb.KeyValuePair{
|
||||
{Key: "Key1", Value: "Value1"},
|
||||
{Key: "Key2", Value: "Value2"},
|
||||
{Key: "Key3", Value: "Value3"},
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
key string
|
||||
kvs []*commonpb.KeyValuePair
|
||||
value string
|
||||
errIsNil bool
|
||||
}{
|
||||
{"Key1", kvs, "Value1", true},
|
||||
{"Key2", kvs, "Value2", true},
|
||||
{"Key3", kvs, "Value3", true},
|
||||
{"other", kvs, "", false},
|
||||
}
|
||||
|
||||
for _, test := range cases {
|
||||
value, err := GetAttrByKeyFromRepeatedKV(test.key, test.kvs)
|
||||
assert.Equal(t, test.value, value)
|
||||
assert.Equal(t, test.errIsNil, err == nil)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue