enhance: [2.5] dump pprof info if component stop progress timeout (#39760)

issue: #39735
pr: #39726

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/39853/head
wei liu 2025-02-13 15:30:47 +08:00 committed by GitHub
parent df28d2200d
commit a50249e0bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 168 additions and 9 deletions

View File

@ -2,20 +2,27 @@ package components
import (
"context"
"fmt"
"os"
"path/filepath"
"runtime/pprof"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var errStopTimeout = errors.New("stop timeout")
// exitWhenStopTimeout stops a component with timeout and exit progress when timeout.
func exitWhenStopTimeout(stop func() error, timeout time.Duration) error {
err := stopWithTimeout(stop, timeout)
err := dumpPprof(func() error { return stopWithTimeout(stop, timeout) })
if errors.Is(err, errStopTimeout) {
log.Info("stop progress timeout, force exit")
os.Exit(1)
}
return err
@ -27,7 +34,7 @@ func stopWithTimeout(stop func() error, timeout time.Duration) error {
defer cancel()
future := conc.Go(func() (struct{}, error) {
return struct{}{}, stop()
return struct{}{}, dumpPprof(stop)
})
select {
case <-future.Inner():
@ -36,3 +43,125 @@ func stopWithTimeout(stop func() error, timeout time.Duration) error {
return errStopTimeout
}
}
// profileType defines the structure for each type of profile to be collected
type profileType struct {
name string // Name of the profile type
filename string // File path for the profile
dump func(*os.File) error // Function to dump the profile data
}
// dumpPprof wraps the execution of a function with pprof profiling
// It collects various performance profiles only if the execution fails
func dumpPprof(exec func() error) error {
// Get pprof directory from configuration
pprofDir := paramtable.Get().ServiceParam.ProfileCfg.PprofPath.GetValue()
if err := os.MkdirAll(pprofDir, 0o755); err != nil {
log.Error("failed to create pprof directory", zap.Error(err))
return exec()
}
// Generate base file path with timestamp
baseFilePath := filepath.Join(
pprofDir,
fmt.Sprintf("%s_pprof_%s",
paramtable.GetRole(),
time.Now().Format("20060102_150405"),
),
)
// Define all profile types to be collected
profiles := []profileType{
{
name: "cpu",
filename: baseFilePath + "_cpu.prof",
dump: func(f *os.File) error {
// Ensure no other CPU profiling is active before starting a new one.
// This prevents the "cpu profiling already in use" error.
pprof.StopCPUProfile()
return pprof.StartCPUProfile(f)
},
},
{
name: "goroutine",
filename: baseFilePath + "_goroutine.prof",
dump: func(f *os.File) error {
return pprof.Lookup("goroutine").WriteTo(f, 0)
},
},
{
name: "heap",
filename: baseFilePath + "_heap.prof",
dump: func(f *os.File) error {
return pprof.WriteHeapProfile(f)
},
},
{
name: "block",
filename: baseFilePath + "_block.prof",
dump: func(f *os.File) error {
return pprof.Lookup("block").WriteTo(f, 0)
},
},
{
name: "mutex",
filename: baseFilePath + "_mutex.prof",
dump: func(f *os.File) error {
return pprof.Lookup("mutex").WriteTo(f, 0)
},
},
}
// Create all profile files and store file handles
files := make(map[string]*os.File)
for _, p := range profiles {
f, err := os.Create(p.filename)
if err != nil {
log.Error("could not create profile file",
zap.String("profile", p.name),
zap.Error(err))
for filename, f := range files {
f.Close()
os.Remove(filename)
}
return exec()
}
files[p.filename] = f
}
// Ensure all files are closed when function returns
defer func() {
for _, f := range files {
f.Close()
}
}()
// Start CPU profiling
cpuProfile := profiles[0]
if err := cpuProfile.dump(files[cpuProfile.filename]); err != nil {
log.Error("could not start CPU profiling", zap.Error(err))
return exec()
}
defer pprof.StopCPUProfile()
// Execute the target function
execErr := exec()
// Only save profiles and collect additional data if execution fails
if execErr != nil {
// Start from index 1 to skip CPU profile (already running)
for _, p := range profiles[1:] {
if err := p.dump(files[p.filename]); err != nil {
log.Error("could not write profile",
zap.String("profile", p.name),
zap.Error(err))
}
}
} else {
// Remove all files if execution succeeds
for _, p := range profiles {
os.Remove(p.filename)
}
}
return execErr
}

View File

@ -775,7 +775,7 @@ log:
grpc:
log:
level: WARNING
gracefulStopTimeout: 10 # second, time to wait graceful stop finish
gracefulStopTimeout: 3 # second, time to wait graceful stop finish
client:
compressionEnabled: false
dialTimeout: 200

View File

@ -1191,7 +1191,7 @@ func Test_Service_GracefulStop(t *testing.T) {
mockProxy.ExpectedCalls = nil
mockProxy.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Run(func(_a0 context.Context, _a1 *milvuspb.GetComponentStatesRequest) {
fmt.Println("rpc start")
time.Sleep(10 * time.Second)
time.Sleep(3 * time.Second)
atomic.AddInt32(&count, 1)
fmt.Println("rpc done")
}).Return(&milvuspb.ComponentStates{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, nil)

View File

@ -23,7 +23,7 @@ func GracefulStopGRPCServer(s *grpc.Server) {
ch := make(chan struct{})
go func() {
defer close(ch)
log.Debug("try to graceful stop grpc server...")
log.Info("try to graceful stop grpc server...")
// will block until all rpc finished.
s.GracefulStop()
}()
@ -31,7 +31,7 @@ func GracefulStopGRPCServer(s *grpc.Server) {
case <-ch:
case <-time.After(paramtable.Get().ProxyGrpcServerCfg.GracefulStopTimeout.GetAsDuration(time.Second)):
// took too long, manually close grpc server
log.Debug("stop grpc server...")
log.Info("force to stop grpc server...")
s.Stop()
// concurrent GracefulStop should be interrupted
<-ch

View File

@ -203,7 +203,7 @@ func (p *GrpcServerConfig) Init(domain string, base *BaseTable) {
p.GracefulStopTimeout = ParamItem{
Key: "grpc.gracefulStopTimeout",
Version: "2.3.1",
DefaultValue: "10",
DefaultValue: "3",
Doc: "second, time to wait graceful stop finish",
Export: true,
}

View File

@ -67,8 +67,7 @@ func TestGrpcServerParams(t *testing.T) {
base.Save("grpc.serverMaxSendSize", "a")
assert.Equal(t, serverConfig.ServerMaxSendSize.GetAsInt(), DefaultServerMaxSendSize)
base.Save(serverConfig.GracefulStopTimeout.Key, "1")
assert.Equal(t, serverConfig.GracefulStopTimeout.GetAsInt(), 1)
assert.Equal(t, serverConfig.GracefulStopTimeout.GetAsInt(), 3)
}
func TestGrpcClientParams(t *testing.T) {

View File

@ -51,6 +51,7 @@ type ServiceParam struct {
RocksmqCfg RocksmqConfig
NatsmqCfg NatsmqConfig
MinioCfg MinioConfig
ProfileCfg ProfileConfig
}
func (p *ServiceParam) init(bt *BaseTable) {
@ -64,6 +65,7 @@ func (p *ServiceParam) init(bt *BaseTable) {
p.RocksmqCfg.Init(bt)
p.NatsmqCfg.Init(bt)
p.MinioCfg.Init(bt)
p.ProfileCfg.Init(bt)
}
func (p *ServiceParam) RocksmqEnable() bool {
@ -1402,3 +1404,25 @@ Leave it empty if you want to use AWS default endpoint`,
}
p.ListObjectsMaxKeys.Init(base.mgr)
}
// profile config
type ProfileConfig struct {
PprofPath ParamItem `refreshable:"false"`
}
func (p *ProfileConfig) Init(base *BaseTable) {
p.PprofPath = ParamItem{
Key: "profile.pprof.path",
Version: "2.5.5",
DefaultValue: "",
Doc: "The folder that storing pprof files, by default will use localStoragePath/pprof",
Formatter: func(v string) string {
if len(v) == 0 {
return path.Join(base.Get("localStorage.path"), "pprof")
}
return v
},
Export: true,
}
p.PprofPath.Init(base.mgr)
}

View File

@ -232,4 +232,11 @@ func TestServiceParam(t *testing.T) {
assert.Equal(t, 100000, Params.PaginationSize.GetAsInt())
assert.Equal(t, 32, Params.ReadConcurrency.GetAsInt())
})
t.Run("test profile config", func(t *testing.T) {
params := &SParams.ProfileCfg
assert.Equal(t, "/var/lib/milvus/data/pprof", params.PprofPath.GetValue())
bt.Save(params.PprofPath.Key, "/tmp/pprof")
assert.Equal(t, "/tmp/pprof", params.PprofPath.GetValue())
})
}