fix: [2.2] Clean the compaction plan info to avoid the object leak (#29374)

issue: https://github.com/milvus-io/milvus/issues/29296
pr: https://github.com/milvus-io/milvus/pull/29365

---------

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/29565/head
SimFG 2023-12-21 20:20:44 +08:00 committed by GitHub
parent 739828a9ee
commit a1799a81ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 315 additions and 9 deletions

View File

@ -49,6 +49,7 @@ import (
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/expr"
)
var Params paramtable.ComponentParam
@ -293,12 +294,12 @@ func (mr *MilvusRoles) Run() {
local := mr.LocalMode
alias := mr.Alias
Params.Init()
// only standalone enable localMsg
if local {
if err := os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.StandaloneDeployMode); err != nil {
log.Error("Failed to set deploy mode: ", zap.Error(err))
}
Params.Init()
if rocksPath := Params.RocksmqPath(); rocksPath != "" {
if err := rocksmqimpl.InitRocksMQ(rocksPath); err != nil {
@ -323,6 +324,8 @@ func (mr *MilvusRoles) Run() {
}
}
expr.Init(Params.EtcdCfg.RootPath)
expr.Register("param", Params)
management.ServeHTTP()
if os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.StandaloneDeployMode {

4
go.mod
View File

@ -16,6 +16,7 @@ require (
github.com/casbin/json-adapter/v2 v2.0.0
github.com/confluentinc/confluent-kafka-go v1.9.1
github.com/containerd/cgroups v1.0.2
github.com/expr-lang/expr v1.15.7
github.com/gin-gonic/gin v1.9.1
github.com/go-basic/ipv4 v1.0.0
github.com/gofrs/flock v0.8.1
@ -38,7 +39,7 @@ require (
github.com/spf13/cast v1.3.1
github.com/spf13/viper v1.8.1
github.com/streamnative/pulsarctl v0.5.0
github.com/stretchr/testify v1.8.3
github.com/stretchr/testify v1.8.4
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c
github.com/uber/jaeger-client-go v2.25.0+incompatible
go.etcd.io/etcd/api/v3 v3.5.5
@ -208,6 +209,7 @@ require (
replace (
github.com/apache/pulsar-client-go => github.com/milvus-io/pulsar-client-go v0.6.8
github.com/bketelsen/crypt => github.com/bketelsen/crypt v0.0.4 // Fix security alert for core-os/etcd
github.com/expr-lang/expr => github.com/SimFG/expr v0.0.0-20231218130003-94d085776dc5
github.com/go-kit/kit => github.com/go-kit/kit v0.1.0
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect

5
go.sum
View File

@ -67,6 +67,8 @@ github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
github.com/SimFG/expr v0.0.0-20231218130003-94d085776dc5 h1:U2V21xTXzCo7RpB1DHpc2X0SToiy/4PuZ/gEYd5/ytY=
github.com/SimFG/expr v0.0.0-20231218130003-94d085776dc5/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ=
github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA=
github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ=
github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc=
@ -768,8 +770,9 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M=

View File

@ -134,18 +134,61 @@ func (c *compactionPlanHandler) start() {
log.Info("compaction handler quit")
return
case <-ticker.C:
cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ts, err := c.allocator.allocTimestamp(cctx)
ts, err := c.GetCurrentTS()
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
cancel()
log.Warn("unable to get current timestamp", zap.Error(err))
continue
}
cancel()
_ = c.updateCompaction(ts)
}
}
}()
go func() {
cleanTicker := time.NewTicker(30 * time.Minute)
defer cleanTicker.Stop()
for {
select {
case <-c.quit:
log.Info("Compaction handler quit clean")
return
case <-cleanTicker.C:
c.Clean()
}
}
}()
}
func (c *compactionPlanHandler) Clean() {
current, err := c.GetCurrentTS()
if err != nil {
log.Warn("fail to get current ts when clean", zap.Error(err))
return
}
c.mu.Lock()
defer c.mu.Unlock()
for id, task := range c.plans {
if task.state == executing || task.state == pipelining {
continue
}
// after timeout + 1h, the plan will be cleaned
if c.isTimeout(current, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()+60*60) {
delete(c.plans, id)
}
}
}
func (c *compactionPlanHandler) GetCurrentTS() (Timestamp, error) {
interval := time.Duration(Params.DataCoordCfg.CompactionRPCTimeout) * time.Second
ctx, cancel := context.WithTimeout(context.Background(), interval)
defer cancel()
ts, err := c.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
return 0, err
}
return ts, nil
}
func (c *compactionPlanHandler) stop() {

View File

@ -971,3 +971,42 @@ func getFieldBinlogPathsWithEntry(id int64, entry int64, paths ...string) *datap
}
return l
}
func TestCompactionPlanHandler_Clean(t *testing.T) {
startTime := tsoutil.ComposeTSByTime(time.Now(), 0)
cleanTime := tsoutil.ComposeTSByTime(time.Now().Add(-2*time.Hour), 0)
a := newMockAllocator()
c := &compactionPlanHandler{
allocator: a,
plans: map[int64]*compactionTask{
1: {
state: executing,
},
2: {
state: pipelining,
},
3: {
state: completed,
plan: &datapb.CompactionPlan{
StartTime: startTime,
},
},
4: {
state: completed,
plan: &datapb.CompactionPlan{
StartTime: cleanTime,
},
},
},
}
a.err = errors.New("mock error")
c.Clean()
assert.Len(t, c.plans, 4)
a.err = nil
a.cnt = int64(startTime)
c.Clean()
assert.Len(t, c.plans, 3)
}

View File

@ -126,9 +126,13 @@ var _ allocator = (*MockAllocator)(nil)
type MockAllocator struct {
cnt int64
err error
}
func (m *MockAllocator) allocTimestamp(ctx context.Context) (Timestamp, error) {
if m.err != nil {
return 0, m.err
}
val := atomic.AddInt64(&m.cnt, 1)
return Timestamp(val), nil
}

View File

@ -57,6 +57,7 @@ import (
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/expr"
)
const (
@ -213,6 +214,7 @@ func CreateServer(ctx context.Context, factory dependency.Factory, opts ...Optio
for _, opt := range opts {
opt(s)
}
expr.Register("datacoord", s)
return s
}

View File

@ -67,6 +67,7 @@ import (
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/expr"
)
const (
@ -146,6 +147,7 @@ func NewDataNode(ctx context.Context, factory dependency.Factory) *DataNode {
reportImportRetryTimes: 10,
}
node.UpdateStateCode(commonpb.StateCode_Abnormal)
expr.Register("datanode", node)
return node
}

View File

@ -58,6 +58,7 @@ import (
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/expr"
)
// make sure IndexCoord implements types.IndexCoord
@ -129,6 +130,7 @@ func NewIndexCoord(ctx context.Context, factory dependency.Factory) (*IndexCoord
indexGCLock: sync.RWMutex{},
}
i.UpdateStateCode(commonpb.StateCode_Abnormal)
expr.Register("indexcoord", i)
return i, nil
}

View File

@ -57,6 +57,7 @@ import (
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/expr"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
@ -123,6 +124,7 @@ func NewIndexNode(ctx context.Context, factory dependency.Factory) (*IndexNode,
}
b.sched = sc
expr.Register("indexnode", b)
return b, nil
}

View File

@ -21,3 +21,6 @@ const HealthzRouterPath = "/healthz"
// LogLevelRouterPath is path for Get and Update log level at runtime.
const LogLevelRouterPath = "/log/level"
// ExprPath is path for expression.
const ExprPath = "/expr"

View File

@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/management/healthz"
"github.com/milvus-io/milvus/pkg/expr"
"go.uber.org/zap"
)
@ -49,6 +50,21 @@ func registerDefaults() {
Path: HealthzRouterPath,
Handler: healthz.Handler(),
})
Register(&HTTPHandler{
Path: ExprPath,
Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
code := req.URL.Query().Get("code")
auth := req.URL.Query().Get("auth")
output, err := expr.Exec(code, auth)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to execute expression, %s"}`, err.Error())))
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(fmt.Sprintf(`{"output": "%s"}`, output)))
}),
})
}
func Register(h *HTTPHandler) {

View File

@ -20,14 +20,17 @@ import (
"bytes"
"context"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/management/healthz"
"github.com/milvus-io/milvus/pkg/expr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
@ -48,7 +51,6 @@ type HTTPServerTestSuite struct {
func (suite *HTTPServerTestSuite) SetupSuite() {
suite.server = httptest.NewServer(nil)
registerDefaults()
}
func (suite *HTTPServerTestSuite) TearDownSuite() {
@ -124,6 +126,31 @@ func (suite *HTTPServerTestSuite) TestHealthzHandler() {
suite.Equal("{\"state\":\"component m2 state is Abnormal\",\"detail\":[{\"name\":\"m1\",\"code\":1},{\"name\":\"m2\",\"code\":2}]}", string(body))
}
func (suite *HTTPServerTestSuite) TestExprHandler() {
expr.Init("by-dev")
expr.Register("foo", "hello")
suite.Run("fail", func() {
url := suite.server.URL + ExprPath + "?code=foo"
client := http.Client{}
req, _ := http.NewRequest(http.MethodGet, url, nil)
resp, err := client.Do(req)
suite.Nil(err)
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
suite.True(strings.Contains(string(body), "failed to execute"))
})
suite.Run("success", func() {
url := suite.server.URL + ExprPath + "?auth=by-dev&code=foo"
client := http.Client{}
req, _ := http.NewRequest(http.MethodGet, url, nil)
resp, err := client.Do(req)
suite.Nil(err)
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
suite.True(strings.Contains(string(body), "hello"))
})
}
func TestHTTPServerSuite(t *testing.T) {
suite.Run(t, new(HTTPServerTestSuite))
}

View File

@ -47,6 +47,7 @@ import (
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/expr"
)
// UniqueID is alias of typeutil.UniqueID
@ -124,6 +125,7 @@ func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) {
multiRateLimiter: NewMultiRateLimiter(),
}
node.UpdateStateCode(commonpb.StateCode_Abnormal)
expr.Register("proxy", node)
logutil.Logger(ctx).Debug("create a new Proxy instance", zap.Any("state", node.stateCode.Load()))
return node, nil
}

View File

@ -58,6 +58,7 @@ import (
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/expr"
)
// Only for re-export
@ -129,6 +130,7 @@ func NewQueryCoord(ctx context.Context, factory dependency.Factory) (*Server, er
notifyNodeUp: make(chan struct{}),
}
server.UpdateStateCode(commonpb.StateCode_Abnormal)
expr.Register("querycoord", server)
return server, nil
}

View File

@ -63,6 +63,7 @@ import (
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/expr"
)
// make sure QueryNode implements types.QueryNode
@ -195,6 +196,7 @@ func NewQueryNode(ctx context.Context, factory dependency.Factory) *QueryNode {
queryNode.tSafeReplica = newTSafeReplica()
queryNode.scheduler = newTaskScheduler(ctx1, queryNode.tSafeReplica)
queryNode.UpdateStateCode(commonpb.StateCode_Abnormal)
expr.Register("querynode", queryNode)
return queryNode
}

View File

@ -67,6 +67,7 @@ import (
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/expr"
)
// UniqueID is an alias of typeutil.UniqueID.
@ -163,6 +164,7 @@ func NewCore(c context.Context, factory dependency.Factory) (*Core, error) {
}
return cli, nil
}
expr.Register("rootcoord", core)
return core, nil
}

View File

@ -73,6 +73,7 @@ type EtcdConfig struct {
// --- ETCD ---
Endpoints []string
RootPath string
MetaRootPath string
KvRootPath string
EtcdLogLevel string
@ -103,6 +104,8 @@ func (p *EtcdConfig) LoadCfgToMemory() {
} else {
p.initEndpoints()
}
p.initRootPath()
p.initMetaRootPath()
p.initKvRootPath()
p.initEtcdLogLevel()
@ -140,6 +143,14 @@ func (p *EtcdConfig) initEndpoints() {
p.Endpoints = strings.Split(endpoints, ",")
}
func (p *EtcdConfig) initRootPath() {
rootPath, err := p.Base.Load("etcd.rootPath")
if err != nil {
panic(err)
}
p.RootPath = rootPath
}
func (p *EtcdConfig) initMetaRootPath() {
rootPath, err := p.Base.Load("etcd.rootPath")
if err != nil {

79
pkg/expr/expr.go Normal file
View File

@ -0,0 +1,79 @@
/*
* Licensed to the LF AI & Data foundation under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package expr
import (
"fmt"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
)
var (
v *vm.VM
env map[string]any
authKey string
)
func Init(rootPath string) {
v = &vm.VM{}
env = make(map[string]any)
authKey = rootPath
}
func Register(key string, value any) {
if env != nil {
env[key] = value
}
}
func Exec(code, auth string) (res string, err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic: %v", e)
}
}()
if v == nil {
return "", fmt.Errorf("the expr isn't inited")
}
if code == "" {
return "", fmt.Errorf("the expr code is empty")
}
if auth == "" {
return "", fmt.Errorf("the expr auth is empty")
}
if authKey != auth {
return "", fmt.Errorf("the expr auth is invalid")
}
program, err := expr.Compile(code, expr.Env(env))
if err != nil {
log.Warn("expr compile failed", zap.String("code", code), zap.Error(err))
return "", err
}
output, err := v.Run(program, env)
if err != nil {
log.Warn("expr run failed", zap.String("code", code), zap.Error(err))
return "", err
}
return fmt.Sprintf("%v", output), nil
}

60
pkg/expr/expr_test.go Normal file
View File

@ -0,0 +1,60 @@
/*
* Licensed to the LF AI & Data foundation under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package expr
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestExec(t *testing.T) {
t.Run("not init", func(t *testing.T) {
_, err := Exec("1+1", "by-dev")
assert.Error(t, err)
})
Init("by-dev")
Register("foo", "hello")
t.Run("empty code", func(t *testing.T) {
_, err := Exec("", "by-dev")
assert.Error(t, err)
})
t.Run("empty auth", func(t *testing.T) {
_, err := Exec("1+1", "")
assert.Error(t, err)
})
t.Run("invalid auth", func(t *testing.T) {
_, err := Exec("1+1", "000")
assert.Error(t, err)
})
t.Run("invalid code", func(t *testing.T) {
_, err := Exec("1+", "by-dev")
assert.Error(t, err)
})
t.Run("valid code", func(t *testing.T) {
out, err := Exec("foo", "by-dev")
assert.NoError(t, err)
assert.Equal(t, "hello", out)
})
}