move verify clusters to its own package

pull/6151/head
Medya Gh 2019-12-21 17:13:08 -08:00
parent af1b77a164
commit 08c715af8b
7 changed files with 270 additions and 163 deletions

1
go.sum
View File

@ -724,6 +724,7 @@ k8s.io/klog v0.3.1 h1:RVgyDHY/kFKtLqh67NvEWIgkMneNoIrdkN0CxDSQc68=
k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v0.3.3 h1:niceAagH1tzskmaie/icWd7ci1wbG7Bf2c6YGcQv+3c=
k8s.io/klog v0.3.3/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI=
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
k8s.io/kubernetes v1.15.2 h1:RO9EuRw5vlN3oa/lnmPxmywOoJRtg9o40KcklHXNIAQ=

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeadm
package bsutil
import (
"net"
@ -32,9 +32,9 @@ const (
rbacName = "minikube-rbac"
)
// elevateKubeSystemPrivileges gives the kube-system service account
// ElevateKubeSystemPrivileges gives the kube-system service account
// cluster admin privileges to work with RBAC.
func elevateKubeSystemPrivileges(client kubernetes.Interface) error {
func ElevateKubeSystemPrivileges(client kubernetes.Interface) error {
start := time.Now()
clusterRoleBinding := &rbac.ClusterRoleBinding{
ObjectMeta: meta.ObjectMeta{

View File

@ -0,0 +1,29 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package bsutil will eventually be renamed to kubeadm package after getting rid of older one
package bsutil
// SkipAdditionalPreflights are additional preflights we skip depending on the runtime in use.
var SkipAdditionalPreflights = map[string][]string{}
// ExpectedRemoteArtifacts remote artifacts that must exist for minikube to function properly. The sign of a previously working installation.
// NOTE: /etc is not persistent across restarts, so don't bother checking there
var ExpectedRemoteArtifacts = []string{
"/var/lib/kubelet/kubeadm-flags.env",
"/var/lib/kubelet/config.yaml",
EtcdDataDir(),
}

View File

@ -0,0 +1,140 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package verify takes care of making sure a running kubernetes cluster is healthy
package verify
import (
"crypto/tls"
"fmt"
"net"
"net/http"
"os/exec"
"time"
"github.com/docker/machine/libmachine/state"
"github.com/golang/glog"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
kconst "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/minikube/pkg/minikube/command"
"k8s.io/minikube/pkg/minikube/config"
)
// APIServerProcess waits for api server to be healthy returns error if it doesn't
func APIServerProcess(runner command.Runner, start time.Time, timeout time.Duration) error {
glog.Infof("waiting for apiserver process to appear ...")
err := wait.PollImmediate(time.Second*1, timeout, func() (bool, error) {
if time.Since(start) > timeout {
return false, fmt.Errorf("cluster wait timed out during process check")
}
rr, ierr := runner.RunCmd(exec.Command("sudo", "pgrep", "kube-apiserver"))
if ierr != nil {
glog.Warningf("pgrep apiserver: %v cmd: %s", ierr, rr.Command())
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("apiserver process never appeared")
}
glog.Infof("duration metric: took %s to wait for apiserver process to appear ...", time.Since(start))
return nil
}
// SystemPods verifies essential pods for running kurnetes is running
func SystemPods(client *kubernetes.Clientset, start time.Time, k8s config.KubernetesConfig, timeout time.Duration) error {
glog.Infof("waiting for kube-system pods to appear ...")
pStart := time.Now()
podStart := time.Time{}
podList := func() (bool, error) {
if time.Since(start) > timeout {
return false, fmt.Errorf("cluster wait timed out during pod check")
}
// Wait for any system pod, as waiting for apiserver may block until etcd
pods, err := client.CoreV1().Pods("kube-system").List(meta.ListOptions{})
if len(pods.Items) < 2 {
podStart = time.Time{}
return false, nil
}
if err != nil {
podStart = time.Time{}
return false, nil
}
if podStart.IsZero() {
podStart = time.Now()
}
glog.Infof("%d kube-system pods found since %s", len(pods.Items), podStart)
if time.Since(podStart) > 2*kconst.APICallRetryInterval {
glog.Infof("stability requirement met, returning")
return true, nil
}
return false, nil
}
if err := wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, podList); err != nil {
return fmt.Errorf("apiserver never returned a pod list")
}
glog.Infof("duration metric: took %s to wait for pod list to return data ...", time.Since(pStart))
return nil
}
func APIServerHealthz(start time.Time, k8s config.KubernetesConfig, timeout time.Duration) error {
glog.Infof("waiting for apiserver healthz status ...")
hStart := time.Now()
healthz := func() (bool, error) {
if time.Since(start) > timeout {
return false, fmt.Errorf("cluster wait timed out during healthz check")
}
status, err := APIServerStatus(net.ParseIP(k8s.NodeIP), k8s.NodePort)
if err != nil {
glog.Warningf("status: %v", err)
return false, nil
}
if status != "Running" {
return false, nil
}
return true, nil
}
if err := wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, healthz); err != nil {
return fmt.Errorf("apiserver healthz never reported healthy")
}
glog.Infof("duration metric: took %s to wait for apiserver healthz status ...", time.Since(hStart))
return nil
}
func APIServerStatus(ip net.IP, apiserverPort int) (string, error) {
url := fmt.Sprintf("https://%s:%d/healthz", ip, apiserverPort)
// To avoid: x509: certificate signed by unknown authority
tr := &http.Transport{
Proxy: nil, // To avoid connectiv issue if http(s)_proxy is set.
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{Transport: tr}
resp, err := client.Get(url)
// Connection refused, usually.
if err != nil {
return state.Stopped.String(), nil
}
if resp.StatusCode != http.StatusOK {
glog.Warningf("%s response: %v %+v", url, err, resp)
return state.Error.String(), nil
}
return state.Running.String(), nil
}

View File

@ -145,8 +145,20 @@ func (k *Bootstrapper) PullImages(k8s config.KubernetesConfig) error {
return nil
}
func (k *Bootstrapper) StartCluster(config.KubernetesConfig) error {
return fmt.Errorf("the StartCluster is not implemented in kicbs yet")
// StartCluster starts the cluster
func (k *Bootstrapper) StartCluster(k8s config.KubernetesConfig) error {
return nil
}
func (k *Bootstrapper) existingConfig() error {
args := append([]string{"ls"}, bsutil.ExpectedRemoteArtifacts...)
_, err := k.c.RunCmd(exec.Command("sudo", args...))
return err
}
// restartCluster restarts the Kubernetes cluster configured by kubeadm
func (k *Bootstrapper) restartCluster(k8s config.KubernetesConfig) error {
return fmt.Errorf("the restartCluster is not implemented in kicbs yet")
}
func (k *Bootstrapper) DeleteCluster(config.KubernetesConfig) error {

View File

@ -0,0 +1,51 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeadm
import (
"github.com/pkg/errors"
"k8s.io/minikube/pkg/minikube/assets"
)
func addAddons(files *[]assets.CopyableFile, data interface{}) error {
// add addons to file list
// custom addons
if err := assets.AddMinikubeDirAssets(files); err != nil {
return errors.Wrap(err, "adding minikube dir assets")
}
// bundled addons
for _, addonBundle := range assets.Addons {
if isEnabled, err := addonBundle.IsEnabled(); err == nil && isEnabled {
for _, addon := range addonBundle.Assets {
if addon.IsTemplate() {
addonFile, err := addon.Evaluate(data)
if err != nil {
return errors.Wrapf(err, "evaluate bundled addon %s asset", addon.GetAssetName())
}
*files = append(*files, addonFile)
} else {
*files = append(*files, addon)
}
}
} else if err != nil {
return nil
}
}
return nil
}

View File

@ -17,12 +17,10 @@ limitations under the License.
package kubeadm
import (
"crypto/tls"
"os/exec"
"fmt"
"net"
"net/http"
// WARNING: Do not use path/filepath in this package unless you want bizarre Windows paths
@ -36,14 +34,13 @@ import (
"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/spf13/viper"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
kconst "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/minikube/pkg/kapi"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/bootstrapper"
"k8s.io/minikube/pkg/minikube/bootstrapper/bsutil"
"k8s.io/minikube/pkg/minikube/bootstrapper/bsutil/verify"
"k8s.io/minikube/pkg/minikube/bootstrapper/images"
"k8s.io/minikube/pkg/minikube/command"
"k8s.io/minikube/pkg/minikube/config"
@ -55,20 +52,10 @@ import (
"k8s.io/minikube/pkg/util/retry"
)
// remote artifacts that must exist for minikube to function properly. The sign of a previously working installation.
// NOTE: /etc is not persistent across restarts, so don't bother checking there
var expectedArtifacts = []string{
"/var/lib/kubelet/kubeadm-flags.env",
"/var/lib/kubelet/config.yaml",
bsutil.EtcdDataDir(),
}
// SkipAdditionalPreflights are additional preflights we skip depending on the runtime in use.
var SkipAdditionalPreflights = map[string][]string{}
// Bootstrapper is a bootstrapper using kubeadm
type Bootstrapper struct {
c command.Runner
k8sClient *kubernetes.Clientset // kubernetes client used to verify pods inside cluster
contextName string
}
@ -83,7 +70,7 @@ func NewBootstrapper(api libmachine.API) (*Bootstrapper, error) {
if err != nil {
return nil, errors.Wrap(err, "command runner")
}
return &Bootstrapper{c: runner, contextName: name}, nil
return &Bootstrapper{c: runner, contextName: name, k8sClient: nil}, nil
}
// GetKubeletStatus returns the kubelet status
@ -106,23 +93,7 @@ func (k *Bootstrapper) GetKubeletStatus() (string, error) {
// GetAPIServerStatus returns the api-server status
func (k *Bootstrapper) GetAPIServerStatus(ip net.IP, apiserverPort int) (string, error) {
url := fmt.Sprintf("https://%s/healthz", net.JoinHostPort(ip.String(), strconv.Itoa(apiserverPort)))
// To avoid: x509: certificate signed by unknown authority
tr := &http.Transport{
Proxy: nil, // To avoid connectiv issue if http(s)_proxy is set.
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{Transport: tr}
resp, err := client.Get(url)
// Connection refused, usually.
if err != nil {
return state.Stopped.String(), nil
}
if resp.StatusCode != http.StatusOK {
glog.Warningf("%s response: %v %+v", url, err, resp)
return state.Error.String(), nil
}
return state.Running.String(), nil
return verify.APIServerStatus(ip, apiserverPort)
}
// LogCommands returns a map of log type to a command which will display that log.
@ -168,7 +139,7 @@ func (k *Bootstrapper) createCompatSymlinks() error {
}
func (k *Bootstrapper) existingConfig() error {
args := append([]string{"ls"}, expectedArtifacts...)
args := append([]string{"ls"}, bsutil.ExpectedRemoteArtifacts...)
_, err := k.c.RunCmd(exec.Command("sudo", args...))
return err
}
@ -209,7 +180,7 @@ func (k *Bootstrapper) StartCluster(k8s config.KubernetesConfig) error {
"Port-10250", // For "none" users who already have a kubelet online
"Swap", // For "none" users who have swap configured
}
ignore = append(ignore, SkipAdditionalPreflights[r.Name()]...)
ignore = append(ignore, bsutil.SkipAdditionalPreflights[r.Name()]...)
// Allow older kubeadm versions to function with newer Docker releases.
if version.LT(semver.MustParse("1.13.0")) {
@ -229,7 +200,7 @@ func (k *Bootstrapper) StartCluster(k8s config.KubernetesConfig) error {
if err != nil {
return err
}
return elevateKubeSystemPrivileges(client)
return bsutil.ElevateKubeSystemPrivileges(client)
}
if err := retry.Expo(elevate, time.Millisecond*500, 120*time.Second); err != nil {
@ -265,37 +236,12 @@ func (k *Bootstrapper) adjustResourceLimits() error {
return nil
}
func addAddons(files *[]assets.CopyableFile, data interface{}) error {
// add addons to file list
// custom addons
if err := assets.AddMinikubeDirAssets(files); err != nil {
return errors.Wrap(err, "adding minikube dir assets")
}
// bundled addons
for _, addonBundle := range assets.Addons {
if isEnabled, err := addonBundle.IsEnabled(); err == nil && isEnabled {
for _, addon := range addonBundle.Assets {
if addon.IsTemplate() {
addonFile, err := addon.Evaluate(data)
if err != nil {
return errors.Wrapf(err, "evaluate bundled addon %s asset", addon.GetAssetName())
}
*files = append(*files, addonFile)
} else {
*files = append(*files, addon)
}
}
} else if err != nil {
return nil
}
}
return nil
}
// client returns a Kubernetes client to use to speak to a kubeadm launched apiserver
func (k *Bootstrapper) client(k8s config.KubernetesConfig) (*kubernetes.Clientset, error) {
if k.k8sClient != nil {
return k.k8sClient, nil
}
config, err := kapi.ClientConfig(k.contextName)
if err != nil {
return nil, errors.Wrap(err, "client config")
@ -306,108 +252,30 @@ func (k *Bootstrapper) client(k8s config.KubernetesConfig) (*kubernetes.Clientse
glog.Errorf("Overriding stale ClientConfig host %s with %s", config.Host, endpoint)
config.Host = endpoint
}
return kubernetes.NewForConfig(config)
}
func (k *Bootstrapper) waitForAPIServerProcess(start time.Time, timeout time.Duration) error {
glog.Infof("waiting for apiserver process to appear ...")
err := wait.PollImmediate(time.Second*1, timeout, func() (bool, error) {
if time.Since(start) > timeout {
return false, fmt.Errorf("cluster wait timed out during process check")
}
rr, ierr := k.c.RunCmd(exec.Command("sudo", "pgrep", "kube-apiserver"))
if ierr != nil {
glog.Warningf("pgrep apiserver: %v cmd: %s", ierr, rr.Command())
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("apiserver process never appeared")
c, err := kubernetes.NewForConfig(config)
if err == nil {
k.k8sClient = c
}
glog.Infof("duration metric: took %s to wait for apiserver process to appear ...", time.Since(start))
return nil
}
func (k *Bootstrapper) waitForAPIServerHealthz(start time.Time, k8s config.KubernetesConfig, timeout time.Duration) error {
glog.Infof("waiting for apiserver healthz status ...")
hStart := time.Now()
healthz := func() (bool, error) {
if time.Since(start) > timeout {
return false, fmt.Errorf("cluster wait timed out during healthz check")
}
status, err := k.GetAPIServerStatus(net.ParseIP(k8s.NodeIP), k8s.NodePort)
if err != nil {
glog.Warningf("status: %v", err)
return false, nil
}
if status != "Running" {
return false, nil
}
return true, nil
}
if err := wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, healthz); err != nil {
return fmt.Errorf("apiserver healthz never reported healthy")
}
glog.Infof("duration metric: took %s to wait for apiserver healthz status ...", time.Since(hStart))
return nil
}
func (k *Bootstrapper) waitForSystemPods(start time.Time, k8s config.KubernetesConfig, timeout time.Duration) error {
glog.Infof("waiting for kube-system pods to appear ...")
pStart := time.Now()
client, err := k.client(k8s)
if err != nil {
return errors.Wrap(err, "client")
}
podStart := time.Time{}
podList := func() (bool, error) {
if time.Since(start) > timeout {
return false, fmt.Errorf("cluster wait timed out during pod check")
}
// Wait for any system pod, as waiting for apiserver may block until etcd
pods, err := client.CoreV1().Pods("kube-system").List(meta.ListOptions{})
if len(pods.Items) < 2 {
podStart = time.Time{}
return false, nil
}
if err != nil {
podStart = time.Time{}
return false, nil
}
if podStart.IsZero() {
podStart = time.Now()
}
glog.Infof("%d kube-system pods found since %s", len(pods.Items), podStart)
if time.Since(podStart) > 2*kconst.APICallRetryInterval {
glog.Infof("stability requirement met, returning")
return true, nil
}
return false, nil
}
if err = wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, podList); err != nil {
return fmt.Errorf("apiserver never returned a pod list")
}
glog.Infof("duration metric: took %s to wait for pod list to return data ...", time.Since(pStart))
return nil
return c, err
}
// WaitForCluster blocks until the cluster appears to be healthy
func (k *Bootstrapper) WaitForCluster(k8s config.KubernetesConfig, timeout time.Duration) error {
start := time.Now()
out.T(out.Waiting, "Waiting for cluster to come online ...")
if err := k.waitForAPIServerProcess(start, timeout); err != nil {
if err := verify.APIServerProcess(k.c, start, timeout); err != nil {
return err
}
if err := k.waitForAPIServerHealthz(start, k8s, timeout); err != nil {
if err := verify.APIServerHealthz(start, k8s, timeout); err != nil {
return err
}
return k.waitForSystemPods(start, k8s, timeout)
c, err := k.client(k8s)
if err != nil {
return errors.Wrap(err, "get k8s client")
}
return verify.SystemPods(c, start, k8s, timeout)
}
// restartCluster restarts the Kubernetes cluster configured by kubeadm
@ -452,10 +320,16 @@ func (k *Bootstrapper) restartCluster(k8s config.KubernetesConfig) error {
}
// We must ensure that the apiserver is healthy before proceeding
if err := k.waitForAPIServerHealthz(time.Now(), k8s, kconst.DefaultControlPlaneTimeout); err != nil {
if err := verify.APIServerProcess(k.c, time.Now(), kconst.DefaultControlPlaneTimeout); err != nil {
return errors.Wrap(err, "apiserver healthz")
}
if err := k.waitForSystemPods(time.Now(), k8s, kconst.DefaultControlPlaneTimeout); err != nil {
client, err := k.client(k8s)
if err != nil {
return errors.Wrap(err, "getting k8s client")
}
if err := verify.SystemPods(client, time.Now(), k8s, kconst.DefaultControlPlaneTimeout); err != nil {
return errors.Wrap(err, "system pods")
}