Merge pull request #7260 from tstromberg/block-on-upgrade2

Wait for control-plane to upgrade before proceeding
pull/7261/head^2
Thomas Strömberg 2020-03-26 10:17:55 -07:00 committed by GitHub
commit 73bdb79ed5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 86 additions and 12 deletions

View File

@ -30,9 +30,11 @@ import (
"github.com/docker/machine/libmachine/state"
"github.com/golang/glog"
"github.com/pkg/errors"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/kubernetes"
kconst "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/minikube/pkg/minikube/bootstrapper"
@ -61,6 +63,7 @@ func WaitForAPIServerProcess(r cruntime.Manager, bs bootstrapper.Bootstrapper, c
if _, ierr := apiServerPID(cr); ierr != nil {
return false, nil
}
return true, nil
})
if err != nil {
@ -180,7 +183,7 @@ func WaitForSystemPods(r cruntime.Manager, bs bootstrapper.Bootstrapper, cfg con
}
// WaitForHealthyAPIServer waits for api server status to be running
func WaitForHealthyAPIServer(r cruntime.Manager, bs bootstrapper.Bootstrapper, cfg config.ClusterConfig, cr command.Runner, start time.Time, ip string, port int, timeout time.Duration) error {
func WaitForHealthyAPIServer(r cruntime.Manager, bs bootstrapper.Bootstrapper, cfg config.ClusterConfig, cr command.Runner, client *kubernetes.Clientset, start time.Time, ip string, port int, timeout time.Duration) error {
glog.Infof("waiting for apiserver healthz status ...")
hStart := time.Now()
@ -208,7 +211,35 @@ func WaitForHealthyAPIServer(r cruntime.Manager, bs bootstrapper.Bootstrapper, c
if err := wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, healthz); err != nil {
return fmt.Errorf("apiserver healthz never reported healthy")
}
glog.Infof("duration metric: took %s to wait for apiserver healthz status ...", time.Since(hStart))
vcheck := func() (bool, error) {
if time.Since(start) > timeout {
return false, fmt.Errorf("cluster wait timed out during version check")
}
if err := APIServerVersionMatch(client, cfg.KubernetesConfig.KubernetesVersion); err != nil {
glog.Warningf("api server version match failed: %v", err)
return false, nil
}
return true, nil
}
if err := wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, vcheck); err != nil {
return fmt.Errorf("controlPlane never updated to %s", cfg.KubernetesConfig.KubernetesVersion)
}
glog.Infof("duration metric: took %s to wait for apiserver health ...", time.Since(hStart))
return nil
}
func APIServerVersionMatch(client *kubernetes.Clientset, expected string) error {
vi, err := client.ServerVersion()
if err != nil {
return errors.Wrap(err, "server version")
}
glog.Infof("control plane version: %s", vi)
if version.CompareKubeAwareVersionStrings(vi.String(), expected) != 0 {
return fmt.Errorf("controlPane = %q, expected: %q", vi.String(), expected)
}
return nil
}

View File

@ -56,6 +56,7 @@ import (
"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/minikube/vmpath"
"k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/retry"
"k8s.io/minikube/pkg/version"
)
@ -251,6 +252,27 @@ func (k *Bootstrapper) init(cfg config.ClusterConfig) error {
return nil
}
// unpause unpauses any Kubernetes backplane components
func (k *Bootstrapper) unpause(cfg config.ClusterConfig) error {
cr, err := cruntime.New(cruntime.Config{Type: cfg.KubernetesConfig.ContainerRuntime, Runner: k.c})
if err != nil {
return err
}
ids, err := cr.ListContainers(cruntime.ListOptions{State: cruntime.Paused, Namespaces: []string{"kube-system"}})
if err != nil {
return errors.Wrap(err, "list paused")
}
if len(ids) > 0 {
if err := cr.UnpauseContainers(ids); err != nil {
return err
}
}
return nil
}
// StartCluster starts the cluster
func (k *Bootstrapper) StartCluster(cfg config.ClusterConfig) error {
start := time.Now()
@ -259,6 +281,11 @@ func (k *Bootstrapper) StartCluster(cfg config.ClusterConfig) error {
glog.Infof("StartCluster complete in %s", time.Since(start))
}()
// Before we start, ensure that no paused components are lurking around
if err := k.unpause(cfg); err != nil {
glog.Warningf("unpause failed: %v", err)
}
if err := bsutil.ExistingConfig(k.c); err == nil {
glog.Infof("found existing configuration files, will attempt cluster restart")
rerr := k.restartCluster(cfg)
@ -349,23 +376,23 @@ func (k *Bootstrapper) WaitForNode(cfg config.ClusterConfig, n config.Node, time
return err
}
if err := kverify.WaitForHealthyAPIServer(cr, k, cfg, k.c, start, ip, port, timeout); err != nil {
return err
}
c, err := k.client(ip, port)
client, err := k.client(ip, port)
if err != nil {
return errors.Wrap(err, "get k8s client")
}
if err := kverify.WaitForSystemPods(cr, k, cfg, k.c, c, start, timeout); err != nil {
if err := kverify.WaitForHealthyAPIServer(cr, k, cfg, k.c, client, start, ip, port, timeout); err != nil {
return err
}
if err := kverify.WaitForSystemPods(cr, k, cfg, k.c, client, start, timeout); err != nil {
return errors.Wrap(err, "waiting for system pods")
}
return nil
}
// needsReset returns whether or not the cluster needs to be reconfigured
func (k *Bootstrapper) needsReset(conf string, ip string, port int, client *kubernetes.Clientset) bool {
func (k *Bootstrapper) needsReset(conf string, ip string, port int, client *kubernetes.Clientset, version string) bool {
if rr, err := k.c.RunCmd(exec.Command("sudo", "diff", "-u", conf, conf+".new")); err != nil {
glog.Infof("needs reset: configs differ:\n%s", rr.Output())
return true
@ -386,6 +413,12 @@ func (k *Bootstrapper) needsReset(conf string, ip string, port int, client *kube
glog.Infof("needs reset: %v", err)
return true
}
if err := kverify.APIServerVersionMatch(client, version); err != nil {
glog.Infof("needs reset: %v", err)
return true
}
return false
}
@ -426,7 +459,7 @@ func (k *Bootstrapper) restartCluster(cfg config.ClusterConfig) error {
// If the cluster is running, check if we have any work to do.
conf := bsutil.KubeadmYamlPath
if !k.needsReset(conf, ip, port, client) {
if !k.needsReset(conf, ip, port, client, cfg.KubernetesConfig.KubernetesVersion) {
glog.Infof("Taking a shortcut, as the cluster seems to be properly configured")
return nil
}
@ -466,12 +499,22 @@ func (k *Bootstrapper) restartCluster(cfg config.ClusterConfig) error {
return errors.Wrap(err, "apiserver healthz")
}
if err := kverify.WaitForHealthyAPIServer(cr, k, cfg, k.c, client, time.Now(), ip, port, kconst.DefaultControlPlaneTimeout); err != nil {
return errors.Wrap(err, "apiserver health")
}
if err := kverify.WaitForSystemPods(cr, k, cfg, k.c, client, time.Now(), kconst.DefaultControlPlaneTimeout); err != nil {
return errors.Wrap(err, "system pods")
}
if rr, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", fmt.Sprintf("%s phase addon all --config %s", baseCmd, conf))); err != nil {
return errors.Wrapf(err, fmt.Sprintf("addon phase cmd:%q", rr.Command()))
// This can fail during upgrades if the old pods have not shut down yet
addonPhase := func() error {
_, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", fmt.Sprintf("%s phase addon all --config %s", baseCmd, conf)))
return err
}
if err = retry.Expo(addonPhase, 1*time.Second, 30*time.Second); err != nil {
glog.Warningf("addon install failed, wil retry: %v", err)
return errors.Wrap(err, "addons")
}
if err := bsutil.AdjustResourceLimits(k.c); err != nil {