Merge pull request #7973 from sharifelgamal/restart

Make sure multinode clusters can survive restarts
pull/8297/head
Sharif Elgamal 2020-05-27 17:41:57 -07:00 committed by GitHub
commit 4f0613929f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 161 additions and 41 deletions

View File

@ -286,27 +286,44 @@ func startWithDriver(starter node.Starter, existing *config.ClusterConfig) (*kub
}
numNodes := viper.GetInt(nodes)
if numNodes == 1 && existing != nil {
if existing != nil {
if numNodes > 1 {
// We ignore the --nodes parameter if we're restarting an existing cluster
out.WarningT(`The cluster {{.cluster}} already exists which means the --nodes parameter will be ignored. Use "minikube node add" to add nodes to an existing cluster.`, out.V{"cluster": existing.Name})
}
numNodes = len(existing.Nodes)
}
if numNodes > 1 {
if driver.BareMetal(starter.Cfg.Driver) {
exit.WithCodeT(exit.Config, "The none driver is not compatible with multi-node clusters.")
} else {
out.Ln("")
warnAboutMultiNode()
for i := 1; i < numNodes; i++ {
nodeName := node.Name(i + 1)
n := config.Node{
Name: nodeName,
Worker: true,
ControlPlane: false,
KubernetesVersion: starter.Cfg.KubernetesConfig.KubernetesVersion,
// Only warn users on first start.
if existing == nil {
out.Ln("")
warnAboutMultiNode()
for i := 1; i < numNodes; i++ {
nodeName := node.Name(i + 1)
n := config.Node{
Name: nodeName,
Worker: true,
ControlPlane: false,
KubernetesVersion: starter.Cfg.KubernetesConfig.KubernetesVersion,
}
out.Ln("") // extra newline for clarity on the command line
err := node.Add(starter.Cfg, n)
if err != nil {
return nil, errors.Wrap(err, "adding node")
}
}
out.Ln("") // extra newline for clarity on the command line
err := node.Add(starter.Cfg, n)
if err != nil {
return nil, errors.Wrap(err, "adding node")
} else {
for _, n := range existing.Nodes {
if !n.ControlPlane {
err := node.Add(starter.Cfg, n)
if err != nil {
return nil, errors.Wrap(err, "adding node")
}
}
}
}
}

View File

@ -232,7 +232,7 @@ func status(api libmachine.API, cc config.ClusterConfig, n config.Node) (*Status
glog.Infof("%s kubelet status = %s", name, stk)
st.Kubelet = stk.String()
// Early exit for regular nodes
// Early exit for worker nodes
if !controlPlane {
return st, nil
}

View File

@ -105,7 +105,7 @@ func WaitForHealthyAPIServer(r cruntime.Manager, bs bootstrapper.Bootstrapper, c
}
if err := wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, healthz); err != nil {
return fmt.Errorf("apiserver healthz never reported healthy")
return fmt.Errorf("apiserver healthz never reported healthy: %v", err)
}
vcheck := func() (bool, error) {

View File

@ -302,7 +302,7 @@ func (k *Bootstrapper) StartCluster(cfg config.ClusterConfig) error {
if err := bsutil.ExistingConfig(k.c); err == nil {
glog.Infof("found existing configuration files, will attempt cluster restart")
rerr := k.restartCluster(cfg)
rerr := k.restartControlPlane(cfg)
if rerr == nil {
return nil
}
@ -484,7 +484,7 @@ func (k *Bootstrapper) needsReconfigure(conf string, hostname string, port int,
}
// restartCluster restarts the Kubernetes cluster configured by kubeadm
func (k *Bootstrapper) restartCluster(cfg config.ClusterConfig) error {
func (k *Bootstrapper) restartControlPlane(cfg config.ClusterConfig) error {
glog.Infof("restartCluster start")
start := time.Now()
@ -605,10 +605,24 @@ func (k *Bootstrapper) JoinCluster(cc config.ClusterConfig, n config.Node, joinC
}()
// Join the master by specifying its token
joinCmd = fmt.Sprintf("%s --v=10 --node-name=%s", joinCmd, driver.MachineName(cc, n))
out, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", joinCmd))
if err != nil {
return errors.Wrapf(err, "cmd failed: %s\n%+v\n", joinCmd, out)
joinCmd = fmt.Sprintf("%s --node-name=%s", joinCmd, driver.MachineName(cc, n))
join := func() error {
// reset first to clear any possibly existing state
_, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", fmt.Sprintf("%s reset -f", bsutil.InvokeKubeadm(cc.KubernetesConfig.KubernetesVersion))))
if err != nil {
glog.Infof("kubeadm reset failed, continuing anyway: %v", err)
}
out, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", joinCmd))
if err != nil {
return errors.Wrapf(err, "cmd failed: %s\n%+v\n", joinCmd, out.Output())
}
return nil
}
if err := retry.Expo(join, 10*time.Second, 1*time.Minute); err != nil {
return errors.Wrap(err, "joining cp")
}
if _, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", "sudo systemctl daemon-reload && sudo systemctl enable kubelet && sudo systemctl start kubelet")); err != nil {
@ -618,17 +632,21 @@ func (k *Bootstrapper) JoinCluster(cc config.ClusterConfig, n config.Node, joinC
return nil
}
// GenerateToken creates a token and returns the appropriate kubeadm join command to run
// GenerateToken creates a token and returns the appropriate kubeadm join command to run, or the already existing token
func (k *Bootstrapper) GenerateToken(cc config.ClusterConfig) (string, error) {
// Take that generated token and use it to get a kubeadm join command
tokenCmd := exec.Command("/bin/bash", "-c", fmt.Sprintf("%s token create --print-join-command --ttl=0", bsutil.InvokeKubeadm(cc.KubernetesConfig.KubernetesVersion)))
r, err := k.c.RunCmd(tokenCmd)
if err != nil {
return "", errors.Wrap(err, "generating bootstrap token")
return "", errors.Wrap(err, "generating join command")
}
joinCmd := r.Stdout.String()
joinCmd = strings.Replace(joinCmd, "kubeadm", bsutil.InvokeKubeadm(cc.KubernetesConfig.KubernetesVersion), 1)
joinCmd = fmt.Sprintf("%s --ignore-preflight-errors=all", strings.TrimSpace(joinCmd))
if cc.KubernetesConfig.CRISocket != "" {
joinCmd = fmt.Sprintf("%s --cri-socket %s", joinCmd, cc.KubernetesConfig.CRISocket)
}
return joinCmd, nil
}
@ -743,14 +761,18 @@ func (k *Bootstrapper) UpdateNode(cfg config.ClusterConfig, n config.Node, r cru
}
files := []assets.CopyableFile{
assets.NewMemoryAssetTarget(kubeadmCfg, bsutil.KubeadmYamlPath+".new", "0640"),
assets.NewMemoryAssetTarget(kubeletCfg, bsutil.KubeletSystemdConfFile, "0644"),
assets.NewMemoryAssetTarget(kubeletService, bsutil.KubeletServiceFile, "0644"),
}
if n.ControlPlane {
files = append(files, assets.NewMemoryAssetTarget(kubeadmCfg, bsutil.KubeadmYamlPath+".new", "0640"))
}
// Copy the default CNI config (k8s.conf), so that kubelet can successfully
// start a Pod in the case a user hasn't manually installed any CNI plugin
// and minikube was started with "--extra-config=kubelet.network-plugin=cni".
if cfg.KubernetesConfig.EnableDefaultCNI {
if cfg.KubernetesConfig.EnableDefaultCNI && !config.MultiNode(cfg) {
files = append(files, assets.NewMemoryAssetTarget([]byte(defaultCNIConfig), bsutil.DefaultCNIConfigPath, "0644"))
}

View File

@ -136,7 +136,11 @@ func New(c Config) (Manager, error) {
switch c.Type {
case "", "docker":
return &Docker{Socket: c.Socket, Runner: c.Runner, Init: sm}, nil
return &Docker{
Socket: c.Socket,
Runner: c.Runner,
Init: sm,
}, nil
case "crio", "cri-o":
return &CRIO{
Socket: c.Socket,

View File

@ -196,8 +196,8 @@ func CacheAndLoadImages(images []string) error {
status, err := Status(api, m)
if err != nil {
glog.Warningf("error getting status for %s: %v", pName, err)
failed = append(failed, pName)
glog.Warningf("error getting status for %s: %v", m, err)
failed = append(failed, m)
continue
}
@ -205,7 +205,7 @@ func CacheAndLoadImages(images []string) error {
h, err := api.Load(m)
if err != nil {
glog.Warningf("Failed to load machine %q: %v", m, err)
failed = append(failed, pName)
failed = append(failed, m)
continue
}
cr, err := CommandRunner(h)
@ -214,10 +214,10 @@ func CacheAndLoadImages(images []string) error {
}
err = LoadImages(c, cr, images, constants.ImageCacheDir)
if err != nil {
failed = append(failed, pName)
failed = append(failed, m)
glog.Warningf("Failed to load cached images for profile %s. make sure the profile is running. %v", pName, err)
}
succeeded = append(succeeded, pName)
succeeded = append(succeeded, m)
}
}
}

View File

@ -111,7 +111,7 @@ func recreateIfNeeded(api libmachine.API, cc *config.ClusterConfig, n *config.No
}
if !me || err == constants.ErrMachineMissing {
out.T(out.Shrug, `{{.driver_name}} "{{.cluster}}" {{.machine_type}} is missing, will recreate.`, out.V{"driver_name": cc.Driver, "cluster": cc.Name, "machine_type": machineType})
out.T(out.Shrug, `{{.driver_name}} "{{.cluster}}" {{.machine_type}} is missing, will recreate.`, out.V{"driver_name": cc.Driver, "cluster": machineName, "machine_type": machineType})
demolish(api, *cc, *n, h)
glog.Infof("Sleeping 1 second for extra luck!")
@ -133,13 +133,13 @@ func recreateIfNeeded(api libmachine.API, cc *config.ClusterConfig, n *config.No
if s == state.Running {
if !recreated {
out.T(out.Running, `Updating the running {{.driver_name}} "{{.cluster}}" {{.machine_type}} ...`, out.V{"driver_name": cc.Driver, "cluster": cc.Name, "machine_type": machineType})
out.T(out.Running, `Updating the running {{.driver_name}} "{{.cluster}}" {{.machine_type}} ...`, out.V{"driver_name": cc.Driver, "cluster": machineName, "machine_type": machineType})
}
return h, nil
}
if !recreated {
out.T(out.Restarting, `Restarting existing {{.driver_name}} {{.machine_type}} for "{{.cluster}}" ...`, out.V{"driver_name": cc.Driver, "cluster": cc.Name, "machine_type": machineType})
out.T(out.Restarting, `Restarting existing {{.driver_name}} {{.machine_type}} for "{{.cluster}}" ...`, out.V{"driver_name": cc.Driver, "cluster": machineName, "machine_type": machineType})
}
if err := h.Driver.Start(); err != nil {
return h, errors.Wrap(err, "driver start")
@ -147,6 +147,7 @@ func recreateIfNeeded(api libmachine.API, cc *config.ClusterConfig, n *config.No
if err := saveHost(api, h, cc, n); err != nil {
return h, err
}
return h, nil
}

View File

@ -168,7 +168,6 @@ func createHost(api libmachine.API, cfg *config.ClusterConfig, n *config.Node) (
if err := saveHost(api, h, cfg, n); err != nil {
return h, err
}
return h, nil
}

View File

@ -21,7 +21,6 @@ import (
"net"
"os"
"os/exec"
"runtime/debug"
"strconv"
"strings"
"sync"
@ -128,6 +127,7 @@ func Start(starter Starter, apiServer bool) (*kubeconfig.Settings, error) {
if err = bs.SetupCerts(starter.Cfg.KubernetesConfig, *starter.Node); err != nil {
return nil, errors.Wrap(err, "setting up certs")
}
}
var wg sync.WaitGroup
@ -156,6 +156,7 @@ func Start(starter Starter, apiServer bool) (*kubeconfig.Settings, error) {
if err := bs.WaitForNode(*starter.Cfg, *starter.Node, viper.GetDuration(waitTimeout)); err != nil {
return nil, errors.Wrap(err, "Wait failed")
}
} else {
if err := bs.UpdateNode(*starter.Cfg, *starter.Node, cr); err != nil {
return nil, errors.Wrap(err, "Updating node")
@ -251,7 +252,6 @@ func configureRuntimes(runner cruntime.CommandRunner, cc config.ClusterConfig, k
err = cr.Enable(disableOthers, forceSystemd())
if err != nil {
debug.PrintStack()
exit.WithError("Failed to enable container runtime", err)
}

View File

@ -46,6 +46,8 @@ func TestMultiNode(t *testing.T) {
{"StopNode", validateStopRunningNode},
{"StartAfterStop", validateStartNodeAfterStop},
{"DeleteNode", validateDeleteNodeFromMultiNode},
{"StopMultiNode", validateStopMultiNodeCluster},
{"RestartMultiNode", validateRestartMultiNodeCluster},
}
for _, tc := range tests {
tc := tc
@ -138,12 +140,20 @@ func validateStopRunningNode(ctx context.Context, t *testing.T, profile string)
}
func validateStartNodeAfterStop(ctx context.Context, t *testing.T, profile string) {
// TODO (#7496): remove skip once restarts work
t.Skip("Restarting nodes is broken :(")
if DockerDriver() {
rr, err := Run(t, exec.Command("docker", "version", "-f", "{{.Server.Version}}"))
if err != nil {
t.Fatalf("docker is broken: %v", err)
}
if strings.Contains(rr.Stdout.String(), "azure") {
t.Skip("kic containers are not supported on docker's azure")
}
}
// Start the node back up
rr, err := Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "node", "start", ThirdNodeName))
rr, err := Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "node", "start", ThirdNodeName, "--alsologtostderr"))
if err != nil {
t.Logf(rr.Stderr.String())
t.Errorf("node start returned an error. args %q: %v", rr.Command(), err)
}
@ -160,6 +170,73 @@ func validateStartNodeAfterStop(ctx context.Context, t *testing.T, profile strin
if strings.Count(rr.Stdout.String(), "kubelet: Running") != 3 {
t.Errorf("status says both kubelets are not running: args %q: %v", rr.Command(), rr.Stdout.String())
}
// Make sure kubectl can connect correctly
rr, err = Run(t, exec.CommandContext(ctx, "kubectl", "get", "nodes"))
if err != nil {
t.Fatalf("failed to kubectl get nodes. args %q : %v", rr.Command(), err)
}
}
func validateStopMultiNodeCluster(ctx context.Context, t *testing.T, profile string) {
// Run minikube node stop on that node
rr, err := Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "stop"))
if err != nil {
t.Errorf("node stop returned an error. args %q: %v", rr.Command(), err)
}
// Run status to see the stopped hosts
rr, err = Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "status"))
// Exit code 7 means one host is stopped, which we are expecting
if err != nil && rr.ExitCode != 7 {
t.Fatalf("failed to run minikube status. args %q : %v", rr.Command(), err)
}
// Make sure minikube status shows 2 stopped nodes
rr, err = Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "status", "--alsologtostderr"))
if err != nil && rr.ExitCode != 7 {
t.Fatalf("failed to run minikube status. args %q : %v", rr.Command(), err)
}
if strings.Count(rr.Stdout.String(), "host: Stopped") != 2 {
t.Errorf("incorrect number of stopped hosts: args %q: %v", rr.Command(), rr.Stdout.String())
}
if strings.Count(rr.Stdout.String(), "kubelet: Stopped") != 2 {
t.Errorf("incorrect number of stopped kubelets: args %q: %v", rr.Command(), rr.Stdout.String())
}
}
func validateRestartMultiNodeCluster(ctx context.Context, t *testing.T, profile string) {
if DockerDriver() {
rr, err := Run(t, exec.Command("docker", "version", "-f", "{{.Server.Version}}"))
if err != nil {
t.Fatalf("docker is broken: %v", err)
}
if strings.Contains(rr.Stdout.String(), "azure") {
t.Skip("kic containers are not supported on docker's azure")
}
}
// Restart a full cluster with minikube start
startArgs := append([]string{"start", "-p", profile}, StartArgs()...)
rr, err := Run(t, exec.CommandContext(ctx, Target(), startArgs...))
if err != nil {
t.Fatalf("failed to start cluster. args %q : %v", rr.Command(), err)
}
// Make sure minikube status shows 2 running nodes
rr, err = Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "status", "--alsologtostderr"))
if err != nil {
t.Fatalf("failed to run minikube status. args %q : %v", rr.Command(), err)
}
if strings.Count(rr.Stdout.String(), "host: Running") != 2 {
t.Errorf("status says both hosts are not running: args %q: %v", rr.Command(), rr.Stdout.String())
}
if strings.Count(rr.Stdout.String(), "kubelet: Running") != 2 {
t.Errorf("status says both kubelets are not running: args %q: %v", rr.Command(), rr.Stdout.String())
}
}
func validateDeleteNodeFromMultiNode(ctx context.Context, t *testing.T, profile string) {