mirror of https://github.com/milvus-io/milvus.git
Support receive signals from parent process (#27756)
Signed-off-by: jaime <yun.zhang@zilliz.com>pull/27744/head
parent
c0edc22a6e
commit
ac2d1bb5c2
66
cmd/main.go
66
cmd/main.go
|
@ -20,6 +20,7 @@ import (
|
|||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"os/signal"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/exp/slices"
|
||||
|
@ -43,29 +44,52 @@ func main() {
|
|||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
|
||||
// No need to extra wait for the process
|
||||
err := cmd.Run()
|
||||
|
||||
// clean session
|
||||
paramtable.Init()
|
||||
params := paramtable.Get()
|
||||
if len(args) >= 3 {
|
||||
metaPath := params.EtcdCfg.MetaRootPath.GetValue()
|
||||
endpoints := params.EtcdCfg.Endpoints.GetValue()
|
||||
etcdEndpoints := strings.Split(endpoints, ",")
|
||||
|
||||
sessionSuffix := sessionutil.GetSessions(cmd.Process.Pid)
|
||||
defer sessionutil.RemoveServerInfoFile(cmd.Process.Pid)
|
||||
|
||||
if err := milvus.CleanSession(metaPath, etcdEndpoints, sessionSuffix); err != nil {
|
||||
log.Println("clean session failed", err.Error())
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
// Command not found on PATH, not executable, &c.
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Println("subprocess exit, ", err.Error())
|
||||
} else {
|
||||
log.Println("exit code:", cmd.ProcessState.ExitCode())
|
||||
// wait for the command to finish
|
||||
waitCh := make(chan error, 1)
|
||||
go func() {
|
||||
waitCh <- cmd.Wait()
|
||||
close(waitCh)
|
||||
}()
|
||||
|
||||
sc := make(chan os.Signal, 1)
|
||||
signal.Notify(sc)
|
||||
|
||||
// Need a for loop to handle multiple signals
|
||||
for {
|
||||
select {
|
||||
case sig := <-sc:
|
||||
if err := cmd.Process.Signal(sig); err != nil {
|
||||
log.Println("error sending signal", sig, err)
|
||||
}
|
||||
case err := <-waitCh:
|
||||
// clean session
|
||||
paramtable.Init()
|
||||
params := paramtable.Get()
|
||||
if len(args) >= 3 {
|
||||
metaPath := params.EtcdCfg.MetaRootPath.GetValue()
|
||||
endpoints := params.EtcdCfg.Endpoints.GetValue()
|
||||
etcdEndpoints := strings.Split(endpoints, ",")
|
||||
|
||||
sessionSuffix := sessionutil.GetSessions(cmd.Process.Pid)
|
||||
defer sessionutil.RemoveServerInfoFile(cmd.Process.Pid)
|
||||
|
||||
if err := milvus.CleanSession(metaPath, etcdEndpoints, sessionSuffix); err != nil {
|
||||
log.Println("clean session failed", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Println("subprocess exit, ", err.Error())
|
||||
} else {
|
||||
log.Println("exit code:", cmd.ProcessState.ExitCode())
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
milvus.RunMilvus(os.Args)
|
||||
|
|
|
@ -2,15 +2,20 @@ package milvus
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gofrs/flock"
|
||||
"github.com/samber/lo"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.uber.org/zap"
|
||||
|
||||
|
@ -205,11 +210,6 @@ func CleanSession(metaPath string, etcdEndpoints []string, sessionSuffix []strin
|
|||
return nil
|
||||
}
|
||||
|
||||
keys := getSessionPaths(metaPath, sessionSuffix)
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -218,6 +218,12 @@ func CleanSession(metaPath string, etcdEndpoints []string, sessionSuffix []strin
|
|||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
keys := getSessionPaths(ctx, etcdCli, metaPath, sessionSuffix)
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
_, _ = etcdCli.Delete(ctx, key, clientv3.WithPrefix())
|
||||
}
|
||||
|
@ -225,12 +231,79 @@ func CleanSession(metaPath string, etcdEndpoints []string, sessionSuffix []strin
|
|||
return nil
|
||||
}
|
||||
|
||||
func getSessionPaths(metaPath string, sessionSuffix []string) []string {
|
||||
func getSessionPaths(ctx context.Context, client *clientv3.Client, metaPath string, sessionSuffix []string) []string {
|
||||
sessionKeys := make([]string, 0)
|
||||
sessionPathPrefix := path.Join(metaPath, sessionutil.DefaultServiceRoot)
|
||||
for _, suffix := range sessionSuffix {
|
||||
newSessionSuffixSet := addActiveKeySuffix(ctx, client, sessionPathPrefix, sessionSuffix)
|
||||
for _, suffix := range newSessionSuffixSet {
|
||||
key := path.Join(sessionPathPrefix, suffix)
|
||||
sessionKeys = append(sessionKeys, key)
|
||||
}
|
||||
return sessionKeys
|
||||
}
|
||||
|
||||
// filterUnmatchedKey skip active keys that don't match completed key, the latest active key may from standby server
|
||||
func addActiveKeySuffix(ctx context.Context, client *clientv3.Client, sessionPathPrefix string, sessionSuffix []string) []string {
|
||||
suffixSet := lo.SliceToMap(sessionSuffix, func(t string) (string, struct{}) {
|
||||
return t, struct{}{}
|
||||
})
|
||||
|
||||
for _, suffix := range sessionSuffix {
|
||||
if strings.Contains(suffix, "-") && (strings.HasPrefix(suffix, typeutil.RootCoordRole) ||
|
||||
strings.HasPrefix(suffix, typeutil.QueryCoordRole) || strings.HasPrefix(suffix, typeutil.DataCoordRole) ||
|
||||
strings.HasPrefix(suffix, typeutil.IndexCoordRole)) {
|
||||
res := strings.Split(suffix, "-")
|
||||
if len(res) != 2 {
|
||||
// skip illegal keys
|
||||
log.Warn("skip illegal key", zap.String("suffix", suffix))
|
||||
continue
|
||||
}
|
||||
|
||||
serverType := res[0]
|
||||
targetServerID, err := strconv.ParseInt(res[1], 10, 64)
|
||||
if err != nil {
|
||||
log.Warn("get server id failed from key", zap.String("suffix", suffix), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
key := path.Join(sessionPathPrefix, serverType)
|
||||
serverID, err := getServerID(ctx, client, key)
|
||||
if err != nil {
|
||||
log.Warn("get server id failed from key", zap.String("suffix", suffix), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
if serverID == targetServerID {
|
||||
log.Info("add active serverID key", zap.String("suffix", suffix), zap.String("key", key))
|
||||
suffixSet[serverType] = struct{}{}
|
||||
}
|
||||
|
||||
// also remove a faked indexcoord seesion if role is a datacoord
|
||||
if strings.HasPrefix(suffix, typeutil.DataCoordRole) {
|
||||
suffixSet[typeutil.IndexCoordRole] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return lo.MapToSlice(suffixSet, func(key string, v struct{}) string { return key })
|
||||
}
|
||||
|
||||
func getServerID(ctx context.Context, client *clientv3.Client, key string) (int64, error) {
|
||||
resp, err := client.Get(ctx, key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if len(resp.Kvs) == 0 {
|
||||
return 0, errors.New("not found value")
|
||||
}
|
||||
|
||||
value := resp.Kvs[0].Value
|
||||
session := &sessionutil.SessionRaw{}
|
||||
err = json.Unmarshal(value, &session)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return session.ServerID, nil
|
||||
}
|
||||
|
|
|
@ -40,7 +40,6 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -1159,22 +1158,13 @@ func saveServerInfoInternal(role string, serverID int64, pid int) {
|
|||
}
|
||||
defer fd.Close()
|
||||
|
||||
data := fmt.Sprintf("%s-%d", role, serverID)
|
||||
// remove active session if role is a coordinator
|
||||
if role == typeutil.RootCoordRole || role == typeutil.QueryCoordRole {
|
||||
data = fmt.Sprintf("%s\n%s\n", data, role)
|
||||
} else if role == typeutil.DataCoordRole {
|
||||
// also remove a faked indexcoord seesion if role is a datacoord coord
|
||||
data = fmt.Sprintf("%s\n%s\n%s\n", data, role, typeutil.IndexCoordRole)
|
||||
} else {
|
||||
data = fmt.Sprintf("%s\n", data)
|
||||
}
|
||||
data := fmt.Sprintf("%s-%d\n", role, serverID)
|
||||
_, err = fd.WriteString(data)
|
||||
if err != nil {
|
||||
log.Warn("write server info file fail", zap.String("filePath", fileFullPath), zap.Error(err))
|
||||
}
|
||||
|
||||
log.Info("save into server info to file", zap.String("content", data), zap.String("filePath", fileFullPath))
|
||||
log.Info("save server info into file", zap.String("content", data), zap.String("filePath", fileFullPath))
|
||||
}
|
||||
|
||||
func SaveServerInfo(role string, serverID int64) {
|
||||
|
|
|
@ -712,11 +712,10 @@ func TestServerInfoOp(t *testing.T) {
|
|||
saveServerInfoInternal(typeutil.ProxyRole, serverID, pid)
|
||||
|
||||
sessions := GetSessions(pid)
|
||||
assert.Equal(t, 6, len(sessions))
|
||||
assert.Equal(t, 3, len(sessions))
|
||||
assert.ElementsMatch(t, sessions, []string{
|
||||
"querycoord", "querycoord-999",
|
||||
"datacoord", "datacoord-999",
|
||||
"indexcoord",
|
||||
"querycoord-999",
|
||||
"datacoord-999",
|
||||
"proxy-999",
|
||||
})
|
||||
|
||||
|
@ -732,7 +731,7 @@ func TestServerInfoOp(t *testing.T) {
|
|||
|
||||
SaveServerInfo(typeutil.QueryCoordRole, serverID)
|
||||
sessions := GetSessions(os.Getpid())
|
||||
assert.Equal(t, 2, len(sessions))
|
||||
assert.Equal(t, 1, len(sessions))
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -15,11 +15,11 @@
|
|||
# limitations under the License.
|
||||
|
||||
echo "Stopping milvus..."
|
||||
PROCESS=$(ps -e | grep milvus | grep -v grep | grep -v run-with-subprocess | awk '{print $1}')
|
||||
PROCESS=$(ps -e | grep milvus | grep -v grep | grep run-with-subprocess | awk '{print $1}')
|
||||
if [ -z "$PROCESS" ]; then
|
||||
echo "No milvus process"
|
||||
exit 0
|
||||
fi
|
||||
kill -9 $PROCESS
|
||||
kill -15 $PROCESS
|
||||
echo "Milvus stopped"
|
||||
|
||||
|
|
Loading…
Reference in New Issue