enhance: Support to get the param value in the runtime (#29297)

/kind improvement
issue: #29299

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/29436/head
SimFG 2023-12-22 18:36:44 +08:00 committed by GitHub
parent 7a6aa8552a
commit dd9c61831d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 216 additions and 4 deletions

View File

@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/generic"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
@ -339,6 +340,8 @@ func (mr *MilvusRoles) Run() {
paramtable.Init()
}
expr.Init()
expr.Register("param", paramtable.Get())
http.ServeHTTP()
setupPrometheusHTTPServer(Registry)

2
go.mod
View File

@ -104,6 +104,7 @@ require (
github.com/docker/go-units v0.4.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
github.com/expr-lang/expr v1.15.7 // indirect
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
@ -241,6 +242,7 @@ require (
replace (
github.com/apache/pulsar-client-go => github.com/milvus-io/pulsar-client-go v0.6.10
github.com/bketelsen/crypt => github.com/bketelsen/crypt v0.0.4 // Fix security alert for core-os/etcd
github.com/expr-lang/expr => github.com/SimFG/expr v0.0.0-20231218130003-94d085776dc5
github.com/go-kit/kit => github.com/go-kit/kit v0.1.0
github.com/milvus-io/milvus/pkg => ./pkg
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1

2
go.sum
View File

@ -74,6 +74,8 @@ github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
github.com/SimFG/expr v0.0.0-20231218130003-94d085776dc5 h1:U2V21xTXzCo7RpB1DHpc2X0SToiy/4PuZ/gEYd5/ytY=
github.com/SimFG/expr v0.0.0-20231218130003-94d085776dc5/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ=
github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA=
github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ=
github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc=

View File

@ -52,6 +52,7 @@ import (
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -227,6 +228,7 @@ func CreateServer(ctx context.Context, factory dependency.Factory, opts ...Optio
for _, opt := range opts {
opt(s)
}
expr.Register("datacoord", s)
return s
}

View File

@ -48,6 +48,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -144,6 +145,7 @@ func NewDataNode(ctx context.Context, factory dependency.Factory) *DataNode {
reportImportRetryTimes: 10,
}
node.UpdateStateCode(commonpb.StateCode_Abnormal)
expr.Register("datanode", node)
return node
}

View File

@ -91,7 +91,6 @@ func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error)
}
s.datanode = dn.NewDataNode(s.ctx, s.factory)
return s, nil
}

View File

@ -24,3 +24,6 @@ const LogLevelRouterPath = "/log/level"
// EventLogRouterPath is path for eventlog control.
const EventLogRouterPath = "/eventlog"
// ExprPath is path for expression.
const ExprPath = "/expr"

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/http/healthz"
"github.com/milvus-io/milvus/pkg/eventlog"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -70,6 +71,21 @@ func registerDefaults() {
Path: EventLogRouterPath,
Handler: eventlog.Handler(),
})
Register(&Handler{
Path: ExprPath,
Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
code := req.URL.Query().Get("code")
auth := req.URL.Query().Get("auth")
output, err := expr.Exec(code, auth)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to execute expression, %s"}`, err.Error())))
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(fmt.Sprintf(`{"output": "%s"}`, output)))
}),
})
}
func Register(h *Handler) {

View File

@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/http/healthz"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -192,6 +193,31 @@ func (suite *HTTPServerTestSuite) TestPprofHandler() {
}
}
func (suite *HTTPServerTestSuite) TestExprHandler() {
expr.Init()
expr.Register("foo", "hello")
suite.Run("fail", func() {
url := "http://localhost:" + DefaultListenPort + ExprPath + "?code=foo"
client := http.Client{}
req, _ := http.NewRequest(http.MethodGet, url, nil)
resp, err := client.Do(req)
suite.Nil(err)
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
suite.True(strings.Contains(string(body), "failed to execute"))
})
suite.Run("success", func() {
url := "http://localhost:" + DefaultListenPort + ExprPath + "?auth=by-dev&code=foo"
client := http.Client{}
req, _ := http.NewRequest(http.MethodGet, url, nil)
resp, err := client.Do(req)
suite.Nil(err)
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
suite.True(strings.Contains(string(body), "hello"))
})
}
func TestHTTPServerSuite(t *testing.T) {
suite.Run(t, new(HTTPServerTestSuite))
}

View File

@ -53,6 +53,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -126,6 +127,7 @@ func NewIndexNode(ctx context.Context, factory dependency.Factory) *IndexNode {
sc := NewTaskScheduler(b.loopCtx)
b.sched = sc
expr.Register("indexnode", b)
return b
}

View File

@ -43,6 +43,7 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -144,6 +145,7 @@ func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) {
replicateStreamManager: replicateStreamManager,
}
node.UpdateStateCode(commonpb.StateCode_Abnormal)
expr.Register("proxy", node)
logutil.Logger(ctx).Debug("create a new Proxy instance", zap.Any("state", node.stateCode.Load()))
return node, nil
}

View File

@ -58,6 +58,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -135,6 +136,7 @@ func NewQueryCoord(ctx context.Context) (*Server, error) {
}
server.UpdateStateCode(commonpb.StateCode_Abnormal)
server.queryNodeCreator = session.DefaultQueryNodeCreator
expr.Register("querycoord", server)
return server, nil
}

View File

@ -64,6 +64,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/gc"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/lifetime"
@ -142,6 +143,7 @@ func NewQueryNode(ctx context.Context, factory dependency.Factory) *QueryNode {
}
node.tSafeManager = tsafe.NewTSafeReplica()
expr.Register("querynode", node)
return node
}

View File

@ -60,6 +60,7 @@ import (
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/crypto"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -147,6 +148,7 @@ func NewCore(c context.Context, factory dependency.Factory) (*Core, error) {
core.UpdateStateCode(commonpb.StateCode_Abnormal)
core.SetProxyCreator(proxyutil.DefaultProxyCreator)
expr.Register("rootcoord", core)
return core, nil
}

View File

@ -9,6 +9,7 @@ require (
github.com/cockroachdb/errors v1.9.1
github.com/confluentinc/confluent-kafka-go v1.9.1
github.com/containerd/cgroups v1.1.0
github.com/expr-lang/expr v1.15.7
github.com/golang/protobuf v1.5.3
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.16.5
@ -25,7 +26,7 @@ require (
github.com/spf13/cast v1.3.1
github.com/spf13/viper v1.8.1
github.com/streamnative/pulsarctl v0.5.0
github.com/stretchr/testify v1.8.3
github.com/stretchr/testify v1.8.4
github.com/tikv/client-go/v2 v2.0.4
github.com/uber/jaeger-client-go v2.30.0+incompatible
go.etcd.io/etcd/client/v3 v3.5.5
@ -171,6 +172,7 @@ require (
replace (
github.com/apache/pulsar-client-go => github.com/milvus-io/pulsar-client-go v0.6.10
github.com/bketelsen/crypt => github.com/bketelsen/crypt v0.0.4 // Fix security alert for core-os/etcd
github.com/expr-lang/expr => github.com/SimFG/expr v0.0.0-20231218130003-94d085776dc5
github.com/go-kit/kit => github.com/go-kit/kit v0.1.0
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect

View File

@ -58,6 +58,8 @@ github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwS
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
github.com/SimFG/expr v0.0.0-20231218130003-94d085776dc5 h1:U2V21xTXzCo7RpB1DHpc2X0SToiy/4PuZ/gEYd5/ytY=
github.com/SimFG/expr v0.0.0-20231218130003-94d085776dc5/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ=
github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA=
github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ=
github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc=
@ -687,8 +689,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M=

80
pkg/util/expr/expr.go Normal file
View File

@ -0,0 +1,80 @@
/*
* Licensed to the LF AI & Data foundation under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package expr
import (
"fmt"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var (
v *vm.VM
env map[string]any
authKey string
)
func Init() {
v = &vm.VM{}
env = make(map[string]any)
authKey = paramtable.Get().EtcdCfg.RootPath.GetValue()
}
func Register(key string, value any) {
if env != nil {
env[key] = value
}
}
func Exec(code, auth string) (res string, err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic: %v", e)
}
}()
if v == nil {
return "", fmt.Errorf("the expr isn't inited")
}
if code == "" {
return "", fmt.Errorf("the expr code is empty")
}
if auth == "" {
return "", fmt.Errorf("the expr auth is empty")
}
if authKey != auth {
return "", fmt.Errorf("the expr auth is invalid")
}
program, err := expr.Compile(code, expr.Env(env))
if err != nil {
log.Warn("expr compile failed", zap.String("code", code), zap.Error(err))
return "", err
}
output, err := v.Run(program, env)
if err != nil {
log.Warn("expr run failed", zap.String("code", code), zap.Error(err))
return "", err
}
return fmt.Sprintf("%v", output), nil
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the LF AI & Data foundation under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package expr
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func TestExec(t *testing.T) {
paramtable.Init()
t.Run("not init", func(t *testing.T) {
_, err := Exec("1+1", "by-dev")
assert.Error(t, err)
})
Init()
Register("foo", "hello")
t.Run("empty code", func(t *testing.T) {
_, err := Exec("", "by-dev")
assert.Error(t, err)
})
t.Run("empty auth", func(t *testing.T) {
_, err := Exec("1+1", "")
assert.Error(t, err)
})
t.Run("invalid auth", func(t *testing.T) {
_, err := Exec("1+1", "000")
assert.Error(t, err)
})
t.Run("invalid code", func(t *testing.T) {
_, err := Exec("1+", "by-dev")
assert.Error(t, err)
})
t.Run("valid code", func(t *testing.T) {
out, err := Exec("foo", "by-dev")
assert.NoError(t, err)
assert.Equal(t, "hello", out)
})
}