Support receive signal from parent process (#27755)

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/27785/head
jaime 2023-10-18 20:18:24 +08:00 committed by GitHub
parent e5d0285d11
commit 928fb5006a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 139 additions and 42 deletions

View File

@ -20,6 +20,7 @@ import (
"log"
"os"
"os/exec"
"os/signal"
"golang.org/x/exp/slices"
@ -42,30 +43,54 @@ func main() {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// No need to extra wait for the process
err := cmd.Run()
if err := cmd.Start(); err != nil {
// Command not found on PATH, not executable, &c.
log.Fatal(err)
}
// wait for the command to finish
waitCh := make(chan error, 1)
go func() {
waitCh <- cmd.Wait()
close(waitCh)
}()
var params paramtable.ComponentParam
params.Init()
if len(args) >= 3 {
metaPath := params.EtcdCfg.MetaRootPath
endpoints := params.EtcdCfg.Endpoints
sc := make(chan os.Signal, 1)
signal.Notify(sc)
sessionSuffix := sessionutil.GetSessions(cmd.Process.Pid)
defer sessionutil.RemoveServerInfoFile(cmd.Process.Pid)
// You need a for loop to handle multiple signals
for {
select {
case sig := <-sc:
if err := cmd.Process.Signal(sig); err != nil {
log.Println("error sending signal", sig, err)
}
case err := <-waitCh:
// clean session
if len(args) >= 3 {
metaPath := params.EtcdCfg.MetaRootPath
endpoints := params.EtcdCfg.Endpoints
// clean session
if err := milvus.CleanSession(metaPath, endpoints, sessionSuffix); err != nil {
log.Println("clean session failed", err.Error())
sessionSuffix := sessionutil.GetSessions(cmd.Process.Pid)
defer sessionutil.RemoveServerInfoFile(cmd.Process.Pid)
// clean session
if err := milvus.CleanSession(metaPath, endpoints, sessionSuffix); err != nil {
log.Println("clean session failed", err.Error())
}
}
if err != nil {
log.Println("subprocess exit, ", err.Error())
} else {
log.Println("exit code:", cmd.ProcessState.ExitCode())
}
return
}
}
if err != nil {
log.Println("subprocess exit, ", err.Error())
} else {
log.Println("exit code:", cmd.ProcessState.ExitCode())
}
} else {
milvus.RunMilvus(os.Args)
}

View File

@ -2,6 +2,8 @@ package milvus
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
@ -9,9 +11,12 @@ import (
"os"
"path"
"runtime"
"strconv"
"strings"
"time"
"github.com/gofrs/flock"
"github.com/samber/lo"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
@ -203,11 +208,6 @@ func CleanSession(metaPath string, etcdEndpoints []string, sessionSuffix []strin
return nil
}
keys := getSessionPaths(metaPath, sessionSuffix)
if len(keys) == 0 {
return nil
}
etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints)
if err != nil {
return err
@ -216,6 +216,12 @@ func CleanSession(metaPath string, etcdEndpoints []string, sessionSuffix []strin
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
keys := getSessionPaths(ctx, etcdCli, metaPath, sessionSuffix)
if len(keys) == 0 {
return nil
}
for _, key := range keys {
_, _ = etcdCli.Delete(ctx, key, clientv3.WithPrefix())
}
@ -223,12 +229,86 @@ func CleanSession(metaPath string, etcdEndpoints []string, sessionSuffix []strin
return nil
}
func getSessionPaths(metaPath string, sessionSuffix []string) []string {
func getSessionPaths(ctx context.Context, client *clientv3.Client, metaPath string, sessionSuffix []string) []string {
sessionKeys := make([]string, 0)
sessionPathPrefix := path.Join(metaPath, sessionutil.DefaultServiceRoot)
for _, suffix := range sessionSuffix {
newSessionSuffixSet := addActiveKeySuffix(ctx, client, sessionPathPrefix, sessionSuffix)
for _, suffix := range newSessionSuffixSet {
key := path.Join(sessionPathPrefix, suffix)
sessionKeys = append(sessionKeys, key)
}
return sessionKeys
}
// filterUnmatchedKey skip active keys that don't match completed key, the latest active key may from standby server
func addActiveKeySuffix(ctx context.Context, client *clientv3.Client, sessionPathPrefix string, sessionSuffix []string) []string {
suffixSet := lo.SliceToMap(sessionSuffix, func(t string) (string, struct{}) {
return t, struct{}{}
})
for _, suffix := range sessionSuffix {
if strings.Contains(suffix, "-") && (strings.HasPrefix(suffix, typeutil.RootCoordRole) ||
strings.HasPrefix(suffix, typeutil.QueryCoordRole) || strings.HasPrefix(suffix, typeutil.DataCoordRole) ||
strings.HasPrefix(suffix, typeutil.IndexCoordRole)) {
res := strings.Split(suffix, "-")
if len(res) != 2 {
//skip illegal keys
log.Warn("skip illegal key", zap.String("suffix", suffix))
continue
}
serverType := res[0]
targetServerID, err := strconv.ParseInt(res[1], 10, 64)
if err != nil {
log.Warn("get server id failed from key", zap.String("suffix", suffix))
continue
}
key := path.Join(sessionPathPrefix, serverType)
serverID, err := getServerID(ctx, client, key)
if err != nil {
log.Warn("get server id failed from key", zap.String("suffix", suffix))
continue
}
if serverID == targetServerID {
log.Info("add active serverID key", zap.String("suffix", suffix), zap.String("key", key))
suffixSet[serverType] = struct{}{}
}
}
}
return lo.MapToSlice(suffixSet, func(key string, v struct{}) string { return key })
}
func getServerID(ctx context.Context, client *clientv3.Client, key string) (int64, error) {
resp, err := client.Get(ctx, key)
if err != nil {
return 0, err
}
if len(resp.Kvs) == 0 {
return 0, errors.New("not found value")
}
value := resp.Kvs[0].Value
// copy from Session UnmarshalJSON method
var raw struct {
ServerID int64 `json:"ServerID,omitempty"`
ServerName string `json:"ServerName,omitempty"`
Address string `json:"Address,omitempty"`
Exclusive bool `json:"Exclusive,omitempty"`
Stopping bool `json:"Stopping,omitempty"`
TriggerKill bool
Version string `json:"Version"`
}
err = json.Unmarshal(value, &raw)
if err != nil {
return 0, err
}
return raw.ServerID, nil
}

View File

@ -13,7 +13,7 @@ import (
"sync"
"time"
semver "github.com/blang/semver/v4"
"github.com/blang/semver/v4"
"go.etcd.io/etcd/api/v3/mvccpb"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
@ -24,7 +24,6 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
@ -1047,20 +1046,13 @@ func saveServerInfoInternal(role string, serverID int64, pid int) {
}
defer fd.Close()
data := fmt.Sprintf("%s-%d", role, serverID)
// remove active session if role is a coordinator
if role == typeutil.RootCoordRole || role == typeutil.QueryCoordRole ||
role == typeutil.DataCoordRole || role == typeutil.IndexCoordRole {
data = fmt.Sprintf("%s\n%s\n", data, role)
} else {
data = fmt.Sprintf("%s\n", data)
}
data := fmt.Sprintf("%s-%d\n", role, serverID)
_, err = fd.WriteString(data)
if err != nil {
log.Warn("write server info file fail", zap.String("filePath", fileFullPath), zap.Error(err))
}
log.Info("save into server info to file", zap.String("content", data), zap.String("filePath", fileFullPath))
log.Info("save server info into file", zap.String("content", data), zap.String("filePath", fileFullPath))
}
func SaveServerInfo(role string, serverID int64) {

View File

@ -16,7 +16,7 @@ import (
"testing"
"time"
semver "github.com/blang/semver/v4"
"github.com/blang/semver/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
@ -913,11 +913,11 @@ func TestServerInfoOp(t *testing.T) {
saveServerInfoInternal(typeutil.ProxyRole, serverID, pid)
sessions := GetSessions(pid)
assert.Equal(t, 7, len(sessions))
assert.Equal(t, 4, len(sessions))
assert.ElementsMatch(t, sessions, []string{
"querycoord", "querycoord-999",
"datacoord", "datacoord-999",
"indexcoord", "indexcoord-999",
"querycoord-999",
"datacoord-999",
"indexcoord-999",
"proxy-999"})
RemoveServerInfoFile(pid)
@ -932,6 +932,6 @@ func TestServerInfoOp(t *testing.T) {
SaveServerInfo(typeutil.QueryCoordRole, serverID)
sessions := GetSessions(os.Getpid())
assert.Equal(t, 2, len(sessions))
assert.Equal(t, 1, len(sessions))
})
}

View File

@ -15,11 +15,11 @@
# limitations under the License.
echo "Stopping milvus..."
PROCESS=$(ps -e | grep milvus | grep -v grep | grep -v run-with-subprocess | awk '{print $1}')
PROCESS=$(ps -e | grep milvus | grep -v grep | grep run-with-subprocess | awk '{print $1}')
if [ -z "$PROCESS" ]; then
echo "No milvus process"
exit 0
fi
kill -9 $PROCESS
kill -15 $PROCESS
echo "Milvus stopped"