Merge pull request #4276 from tstromberg/proxy-restart2
Restart kube-proxy using kubeadm & add bootstrapper.WaitClusterpull/4282/head^2
commit
af049bd477
|
@ -32,7 +32,6 @@ import (
|
|||
"github.com/blang/semver"
|
||||
"github.com/docker/machine/libmachine"
|
||||
"github.com/docker/machine/libmachine/host"
|
||||
"github.com/docker/machine/libmachine/state"
|
||||
"github.com/golang/glog"
|
||||
"github.com/google/go-containerregistry/pkg/authn"
|
||||
"github.com/google/go-containerregistry/pkg/name"
|
||||
|
@ -244,9 +243,6 @@ func runStart(cmd *cobra.Command, args []string) {
|
|||
// The kube config must be update must come before bootstrapping, otherwise health checks may use a stale IP
|
||||
kubeconfig := updateKubeConfig(host, &config)
|
||||
bootstrapCluster(bs, cr, runner, config.KubernetesConfig, preexisting, isUpgrade)
|
||||
|
||||
apiserverPort := config.KubernetesConfig.NodePort
|
||||
validateCluster(bs, cr, runner, ip, apiserverPort)
|
||||
configureMounts()
|
||||
if err = LoadCachedImagesInConfigFile(); err != nil {
|
||||
console.Failure("Unable to load cached images from config file.")
|
||||
|
@ -257,6 +253,9 @@ func runStart(cmd *cobra.Command, args []string) {
|
|||
prepareNone()
|
||||
}
|
||||
|
||||
if err := bs.WaitCluster(config.KubernetesConfig); err != nil {
|
||||
exit.WithError("Wait failed", err)
|
||||
}
|
||||
showKubectlConnectInfo(kubeconfig)
|
||||
|
||||
}
|
||||
|
@ -668,34 +667,6 @@ func bootstrapCluster(bs bootstrapper.Bootstrapper, r cruntime.Manager, runner b
|
|||
}
|
||||
}
|
||||
|
||||
// validateCluster validates that the cluster is well-configured and healthy
|
||||
func validateCluster(bs bootstrapper.Bootstrapper, r cruntime.Manager, runner bootstrapper.CommandRunner, ip string, apiserverPort int) {
|
||||
k8sStat := func() (err error) {
|
||||
st, err := bs.GetKubeletStatus()
|
||||
if err != nil || st != state.Running.String() {
|
||||
return &pkgutil.RetriableError{Err: fmt.Errorf("kubelet unhealthy: %v: %s", err, st)}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
err := pkgutil.RetryAfter(20, k8sStat, 3*time.Second)
|
||||
if err != nil {
|
||||
exit.WithLogEntries("kubelet checks failed", err, logs.FindProblems(r, bs, runner))
|
||||
}
|
||||
aStat := func() (err error) {
|
||||
st, err := bs.GetAPIServerStatus(net.ParseIP(ip), apiserverPort)
|
||||
if err != nil || st != state.Running.String() {
|
||||
return &pkgutil.RetriableError{Err: fmt.Errorf("apiserver status=%s err=%v", st, err)}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
err = pkgutil.RetryAfter(30, aStat, 10*time.Second)
|
||||
if err != nil {
|
||||
exit.WithLogEntries("apiserver checks failed", err, logs.FindProblems(r, bs, runner))
|
||||
}
|
||||
console.OutLn("")
|
||||
}
|
||||
|
||||
// configureMounts configures any requested filesystem mounts
|
||||
func configureMounts() {
|
||||
if !viper.GetBool(createMount) {
|
||||
|
|
|
@ -39,6 +39,7 @@ type Bootstrapper interface {
|
|||
UpdateCluster(config.KubernetesConfig) error
|
||||
RestartCluster(config.KubernetesConfig) error
|
||||
DeleteCluster(config.KubernetesConfig) error
|
||||
WaitCluster(config.KubernetesConfig) error
|
||||
// LogCommands returns a map of log type to a command which will display that log.
|
||||
LogCommands(LogOptions) map[string]string
|
||||
SetupCerts(cfg config.KubernetesConfig) error
|
||||
|
|
|
@ -33,6 +33,7 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/minikube/pkg/minikube/assets"
|
||||
"k8s.io/minikube/pkg/minikube/bootstrapper"
|
||||
"k8s.io/minikube/pkg/minikube/config"
|
||||
|
@ -70,7 +71,6 @@ type pod struct {
|
|||
|
||||
// PodsByLayer are queries we run when health checking, sorted roughly by dependency layer
|
||||
var PodsByLayer = []pod{
|
||||
{"apiserver", "component", "kube-apiserver"},
|
||||
{"proxy", "k8s-app", "kube-proxy"},
|
||||
{"etcd", "component", "etcd"},
|
||||
{"scheduler", "component", "kube-scheduler"},
|
||||
|
@ -214,20 +214,10 @@ func (k *Bootstrapper) StartCluster(k8s config.KubernetesConfig) error {
|
|||
}
|
||||
}
|
||||
|
||||
if err := waitForPods(k8s, false); err != nil {
|
||||
return errors.Wrap(err, "wait")
|
||||
}
|
||||
|
||||
glog.Infof("Configuring cluster permissions ...")
|
||||
if err := util.RetryAfter(100, elevateKubeSystemPrivileges, time.Millisecond*500); err != nil {
|
||||
return errors.Wrap(err, "timed out waiting to elevate kube-system RBAC privileges")
|
||||
}
|
||||
|
||||
// Make sure elevating privileges didn't screw anything up
|
||||
if err := waitForPods(k8s, true); err != nil {
|
||||
return errors.Wrap(err, "wait")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -260,37 +250,37 @@ func addAddons(files *[]assets.CopyableFile, data interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// waitForPods waits until the important Kubernetes pods are in running state
|
||||
func waitForPods(k8s config.KubernetesConfig, quiet bool) error {
|
||||
// WaitCluster blocks until Kubernetes appears to be healthy.
|
||||
func (k *Bootstrapper) WaitCluster(k8s config.KubernetesConfig) error {
|
||||
// Do not wait for "k8s-app" pods in the case of CNI, as they are managed
|
||||
// by a CNI plugin which is usually started after minikube has been brought
|
||||
// up. Otherwise, minikube won't start, as "k8s-app" pods are not ready.
|
||||
componentsOnly := k8s.NetworkPlugin == "cni"
|
||||
|
||||
if !quiet {
|
||||
console.OutStyle("waiting-pods", "Waiting for:")
|
||||
}
|
||||
console.OutStyle("waiting-pods", "Verifying:")
|
||||
client, err := util.GetClient()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "k8s client")
|
||||
}
|
||||
|
||||
// Wait until the apiserver can answer queries properly. We don't care if the apiserver
|
||||
// pod shows up as registered, but need the webserver for all subsequent queries.
|
||||
console.Out(" apiserver")
|
||||
if err := k.waitForAPIServer(k8s); err != nil {
|
||||
return errors.Wrap(err, "waiting for apiserver")
|
||||
}
|
||||
|
||||
for _, p := range PodsByLayer {
|
||||
if componentsOnly && p.key != "component" {
|
||||
continue
|
||||
}
|
||||
|
||||
if !quiet {
|
||||
console.Out(" %s", p.name)
|
||||
}
|
||||
console.Out(" %s", p.name)
|
||||
selector := labels.SelectorFromSet(labels.Set(map[string]string{p.key: p.value}))
|
||||
if err := util.WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil {
|
||||
return errors.Wrap(err, fmt.Sprintf("waiting for %s=%s", p.key, p.value))
|
||||
}
|
||||
}
|
||||
if !quiet {
|
||||
console.OutLn("")
|
||||
}
|
||||
console.OutLn("")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -308,11 +298,13 @@ func (k *Bootstrapper) RestartCluster(k8s config.KubernetesConfig) error {
|
|||
controlPlane = "control-plane"
|
||||
}
|
||||
|
||||
configPath := constants.KubeadmConfigFile
|
||||
baseCmd := fmt.Sprintf("sudo kubeadm %s", phase)
|
||||
cmds := []string{
|
||||
fmt.Sprintf("sudo kubeadm %s phase certs all --config %s", phase, constants.KubeadmConfigFile),
|
||||
fmt.Sprintf("sudo kubeadm %s phase kubeconfig all --config %s", phase, constants.KubeadmConfigFile),
|
||||
fmt.Sprintf("sudo kubeadm %s phase %s all --config %s", phase, controlPlane, constants.KubeadmConfigFile),
|
||||
fmt.Sprintf("sudo kubeadm %s phase etcd local --config %s", phase, constants.KubeadmConfigFile),
|
||||
fmt.Sprintf("%s phase certs all --config %s", baseCmd, configPath),
|
||||
fmt.Sprintf("%s phase kubeconfig all --config %s", baseCmd, configPath),
|
||||
fmt.Sprintf("%s phase %s all --config %s", baseCmd, controlPlane, configPath),
|
||||
fmt.Sprintf("%s phase etcd local --config %s", baseCmd, configPath),
|
||||
}
|
||||
|
||||
// Run commands one at a time so that it is easier to root cause failures.
|
||||
|
@ -322,23 +314,32 @@ func (k *Bootstrapper) RestartCluster(k8s config.KubernetesConfig) error {
|
|||
}
|
||||
}
|
||||
|
||||
if err := waitForPods(k8s, false); err != nil {
|
||||
return errors.Wrap(err, "wait")
|
||||
if err := k.waitForAPIServer(k8s); err != nil {
|
||||
return errors.Wrap(err, "waiting for apiserver")
|
||||
}
|
||||
|
||||
console.OutStyle("reconfiguring", "Updating kube-proxy configuration ...")
|
||||
if err = util.RetryAfter(5, func() error { return updateKubeProxyConfigMap(k8s) }, 5*time.Second); err != nil {
|
||||
return errors.Wrap(err, "restarting kube-proxy")
|
||||
// restart the proxy and coredns
|
||||
if err := k.c.Run(fmt.Sprintf("%s phase addon all --config %s", baseCmd, configPath)); err != nil {
|
||||
return errors.Wrapf(err, "addon phase")
|
||||
}
|
||||
|
||||
// Make sure the kube-proxy restart didn't screw anything up.
|
||||
if err := waitForPods(k8s, true); err != nil {
|
||||
return errors.Wrap(err, "wait")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForAPIServer waits for the apiserver to start up
|
||||
func (k *Bootstrapper) waitForAPIServer(k8s config.KubernetesConfig) error {
|
||||
glog.Infof("Waiting for apiserver ...")
|
||||
return wait.PollImmediate(time.Millisecond*200, time.Minute*1, func() (bool, error) {
|
||||
status, err := k.GetAPIServerStatus(net.ParseIP(k8s.NodeIP), k8s.NodePort)
|
||||
glog.Infof("status: %s, err: %v", status, err)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if status != "Running" {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteCluster removes the components that were started earlier
|
||||
func (k *Bootstrapper) DeleteCluster(k8s config.KubernetesConfig) error {
|
||||
cmd := fmt.Sprintf("sudo kubeadm reset --force")
|
||||
|
|
|
@ -17,11 +17,8 @@ limitations under the License.
|
|||
package kubeadm
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"html/template"
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -29,10 +26,8 @@ import (
|
|||
rbac "k8s.io/api/rbac/v1beta1"
|
||||
apierr "k8s.io/apimachinery/pkg/api/errors"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
"k8s.io/minikube/pkg/minikube/config"
|
||||
"k8s.io/minikube/pkg/minikube/constants"
|
||||
"k8s.io/minikube/pkg/minikube/service"
|
||||
"k8s.io/minikube/pkg/util"
|
||||
|
@ -130,98 +125,3 @@ func elevateKubeSystemPrivileges() error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
kubeconfigConf = "kubeconfig.conf"
|
||||
kubeProxyConfigmapTmpl = `apiVersion: v1
|
||||
kind: Config
|
||||
clusters:
|
||||
- cluster:
|
||||
certificate-authority: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
|
||||
server: https://{{.AdvertiseAddress}}:{{.APIServerPort}}
|
||||
name: default
|
||||
contexts:
|
||||
- context:
|
||||
cluster: default
|
||||
namespace: default
|
||||
user: default
|
||||
name: default
|
||||
current-context: default
|
||||
users:
|
||||
- name: default
|
||||
user:
|
||||
tokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token
|
||||
`
|
||||
)
|
||||
|
||||
// updateKubeProxyConfigMap updates the IP & port kube-proxy listens on, and restarts it.
|
||||
func updateKubeProxyConfigMap(k8s config.KubernetesConfig) error {
|
||||
client, err := util.GetClient()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "getting k8s client")
|
||||
}
|
||||
|
||||
selector := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "kube-proxy"}))
|
||||
if err := util.WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil {
|
||||
return errors.Wrap(err, "kube-proxy not running")
|
||||
}
|
||||
|
||||
cfgMap, err := client.CoreV1().ConfigMaps("kube-system").Get("kube-proxy", meta.GetOptions{})
|
||||
if err != nil {
|
||||
return &util.RetriableError{Err: errors.Wrap(err, "getting kube-proxy configmap")}
|
||||
}
|
||||
glog.Infof("kube-proxy config: %v", cfgMap.Data[kubeconfigConf])
|
||||
t := template.Must(template.New("kubeProxyTmpl").Parse(kubeProxyConfigmapTmpl))
|
||||
opts := struct {
|
||||
AdvertiseAddress string
|
||||
APIServerPort int
|
||||
}{
|
||||
AdvertiseAddress: k8s.NodeIP,
|
||||
APIServerPort: k8s.NodePort,
|
||||
}
|
||||
|
||||
kubeconfig := bytes.Buffer{}
|
||||
if err := t.Execute(&kubeconfig, opts); err != nil {
|
||||
return errors.Wrap(err, "executing kube proxy configmap template")
|
||||
}
|
||||
|
||||
if cfgMap.Data == nil {
|
||||
cfgMap.Data = map[string]string{}
|
||||
}
|
||||
|
||||
updated := strings.TrimSuffix(kubeconfig.String(), "\n")
|
||||
glog.Infof("updated kube-proxy config: %s", updated)
|
||||
|
||||
// An optimization, but also one that's unlikely, as kubeadm writes the address as 'localhost'
|
||||
if cfgMap.Data[kubeconfigConf] == updated {
|
||||
glog.Infof("kube-proxy config appears to require no change, not restarting kube-proxy")
|
||||
return nil
|
||||
}
|
||||
cfgMap.Data[kubeconfigConf] = updated
|
||||
|
||||
// Make this step retriable, as it can fail with:
|
||||
// "Operation cannot be fulfilled on configmaps "kube-proxy": the object has been modified; please apply your changes to the latest version and try again"
|
||||
if _, err := client.CoreV1().ConfigMaps("kube-system").Update(cfgMap); err != nil {
|
||||
return &util.RetriableError{Err: errors.Wrap(err, "updating configmap")}
|
||||
}
|
||||
|
||||
pods, err := client.CoreV1().Pods("kube-system").List(meta.ListOptions{
|
||||
LabelSelector: "k8s-app=kube-proxy",
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "listing kube-proxy pods")
|
||||
}
|
||||
for _, pod := range pods.Items {
|
||||
// Retriable, as known to fail with: pods "<name>" not found
|
||||
if err := client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &meta.DeleteOptions{}); err != nil {
|
||||
return &util.RetriableError{Err: errors.Wrapf(err, "deleting pod %+v", pod)}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the scheduler to restart kube-proxy
|
||||
if err := util.WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil {
|
||||
return errors.Wrap(err, "kube-proxy not running")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue