Merge pull request #5348 from tstromberg/ncontext
Integration de-flake: expand lock scopes, sync clock at creationpull/5310/head
commit
f9b8039697
|
@ -57,8 +57,26 @@ var (
|
|||
|
||||
// SetupCerts gets the generated credentials required to talk to the APIServer.
|
||||
func SetupCerts(cmd command.Runner, k8s config.KubernetesConfig) error {
|
||||
// WARNING: This function was not designed for multiple profiles, so it is VERY racey:
|
||||
//
|
||||
// It updates a shared certificate file and uploads it to the apiserver before launch.
|
||||
//
|
||||
// If another process updates the shared certificate, it's invalid.
|
||||
// TODO: Instead of racey manipulation of a shared certificate, use per-profile certs
|
||||
spec := mutex.Spec{
|
||||
Name: "setupCerts",
|
||||
Clock: clock.WallClock,
|
||||
Delay: 15 * time.Second,
|
||||
}
|
||||
glog.Infof("acquiring lock: %+v", spec)
|
||||
releaser, err := mutex.Acquire(spec)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unable to acquire lock for %+v", spec)
|
||||
}
|
||||
defer releaser.Release()
|
||||
|
||||
localPath := constants.GetMinipath()
|
||||
glog.Infof("Setting up certificates for IP: %s\n", k8s.NodeIP)
|
||||
glog.Infof("Setting up %s for IP: %s\n", localPath, k8s.NodeIP)
|
||||
|
||||
if err := generateCerts(k8s); err != nil {
|
||||
return errors.Wrap(err, "Error generating certs")
|
||||
|
@ -126,19 +144,6 @@ func SetupCerts(cmd command.Runner, k8s config.KubernetesConfig) error {
|
|||
}
|
||||
|
||||
func generateCerts(k8s config.KubernetesConfig) error {
|
||||
// TODO: Instead of racey manipulation of a shared certificate, use per-profile certs
|
||||
spec := mutex.Spec{
|
||||
Name: "generateCerts",
|
||||
Clock: clock.WallClock,
|
||||
Delay: 10 * time.Second,
|
||||
}
|
||||
glog.Infof("acquiring lock: %+v", spec)
|
||||
releaser, err := mutex.Acquire(spec)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unable to acquire lock for %+v", spec)
|
||||
}
|
||||
defer releaser.Release()
|
||||
|
||||
serviceIP, err := util.GetServiceClusterIP(k8s.ServiceCIDR)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "getting service cluster ip")
|
||||
|
|
|
@ -228,6 +228,12 @@ func (k *Bootstrapper) createCompatSymlinks() error {
|
|||
|
||||
// StartCluster starts the cluster
|
||||
func (k *Bootstrapper) StartCluster(k8s config.KubernetesConfig) error {
|
||||
start := time.Now()
|
||||
glog.Infof("StartCluster: %+v", k8s)
|
||||
defer func() {
|
||||
glog.Infof("StartCluster complete in %s", time.Since(start))
|
||||
}()
|
||||
|
||||
version, err := parseKubernetesVersion(k8s.KubernetesVersion)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "parsing kubernetes version")
|
||||
|
@ -266,7 +272,15 @@ func (k *Bootstrapper) StartCluster(k8s config.KubernetesConfig) error {
|
|||
|
||||
glog.Infof("Configuring cluster permissions ...")
|
||||
|
||||
if err := retry.Expo(elevateKubeSystemPrivileges, time.Millisecond*500, 60*time.Second); err != nil {
|
||||
elevate := func() error {
|
||||
client, err := k.client(k8s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return elevateKubeSystemPrivileges(client)
|
||||
}
|
||||
|
||||
if err := retry.Expo(elevate, time.Millisecond*500, 120*time.Second); err != nil {
|
||||
return errors.Wrap(err, "timed out waiting to elevate kube-system RBAC privileges")
|
||||
}
|
||||
|
||||
|
@ -326,6 +340,23 @@ func addAddons(files *[]assets.CopyableFile, data interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// client returns a Kubernetes client to use to speak to a kubeadm launched apiserver
|
||||
func (k *Bootstrapper) client(k8s config.KubernetesConfig) (*kubernetes.Clientset, error) {
|
||||
// Catch case if WaitCluster was called with a stale ~/.kube/config
|
||||
config, err := kapi.ClientConfig(k.contextName)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "client config")
|
||||
}
|
||||
|
||||
endpoint := fmt.Sprintf("https://%s:%d", k8s.NodeIP, k8s.NodePort)
|
||||
if config.Host != endpoint {
|
||||
glog.Errorf("Overriding stale ClientConfig host %s with %s", config.Host, endpoint)
|
||||
config.Host = endpoint
|
||||
}
|
||||
|
||||
return kubernetes.NewForConfig(config)
|
||||
}
|
||||
|
||||
// WaitCluster blocks until Kubernetes appears to be healthy.
|
||||
func (k *Bootstrapper) WaitCluster(k8s config.KubernetesConfig, timeout time.Duration) error {
|
||||
// Do not wait for "k8s-app" pods in the case of CNI, as they are managed
|
||||
|
@ -341,22 +372,11 @@ func (k *Bootstrapper) WaitCluster(k8s config.KubernetesConfig, timeout time.Dur
|
|||
return errors.Wrap(err, "waiting for apiserver")
|
||||
}
|
||||
|
||||
// Catch case if WaitCluster was called with a stale ~/.kube/config
|
||||
config, err := kapi.ClientConfig(k.contextName)
|
||||
client, err := k.client(k8s)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "client config")
|
||||
return errors.Wrap(err, "client")
|
||||
}
|
||||
|
||||
endpoint := fmt.Sprintf("https://%s:%d", k8s.NodeIP, k8s.NodePort)
|
||||
if config.Host != endpoint {
|
||||
glog.Errorf("Overriding stale ClientConfig host %s with %s", config.Host, endpoint)
|
||||
config.Host = endpoint
|
||||
}
|
||||
|
||||
client, err := kubernetes.NewForConfig(config)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "k8s client")
|
||||
}
|
||||
for _, p := range PodsByLayer {
|
||||
if componentsOnly && p.key != "component" { // skip component check if network plugin is cni
|
||||
continue
|
||||
|
@ -458,8 +478,12 @@ func (k *Bootstrapper) waitForAPIServer(k8s config.KubernetesConfig) error {
|
|||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
|
||||
// TODO: Check apiserver/kubelet logs for fatal errors so that users don't
|
||||
// need to wait minutes to find out their flag didn't work.
|
||||
|
||||
}
|
||||
err = wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, f)
|
||||
err = wait.PollImmediate(kconst.APICallRetryInterval, 2*kconst.DefaultControlPlaneTimeout, f)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -24,8 +24,7 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
rbac "k8s.io/api/rbac/v1beta1"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/minikube/pkg/minikube/constants"
|
||||
"k8s.io/minikube/pkg/minikube/service"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/minikube/pkg/util/retry"
|
||||
)
|
||||
|
||||
|
@ -35,13 +34,8 @@ const (
|
|||
|
||||
// elevateKubeSystemPrivileges gives the kube-system service account
|
||||
// cluster admin privileges to work with RBAC.
|
||||
func elevateKubeSystemPrivileges() error {
|
||||
func elevateKubeSystemPrivileges(client kubernetes.Interface) error {
|
||||
start := time.Now()
|
||||
k8s := service.K8s
|
||||
client, err := k8s.GetClientset(constants.DefaultK8sClientTimeout)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "getting clientset")
|
||||
}
|
||||
clusterRoleBinding := &rbac.ClusterRoleBinding{
|
||||
ObjectMeta: meta.ObjectMeta{
|
||||
Name: rbacName,
|
||||
|
@ -63,8 +57,7 @@ func elevateKubeSystemPrivileges() error {
|
|||
glog.Infof("Role binding %s already exists. Skipping creation.", rbacName)
|
||||
return nil
|
||||
}
|
||||
_, err = client.RbacV1beta1().ClusterRoleBindings().Create(clusterRoleBinding)
|
||||
if err != nil {
|
||||
if _, err := client.RbacV1beta1().ClusterRoleBindings().Create(clusterRoleBinding); err != nil {
|
||||
netErr, ok := err.(net.Error)
|
||||
if ok && netErr.Timeout() {
|
||||
return &retry.RetriableError{Err: errors.Wrap(err, "creating clusterrolebinding")}
|
||||
|
|
|
@ -463,6 +463,11 @@ func createHost(api libmachine.API, config cfg.MachineConfig) (*host.Host, error
|
|||
|
||||
if !localDriver(config.VMDriver) {
|
||||
showRemoteOsRelease(h.Driver)
|
||||
// Ensure that even new VM's have proper time synchronization up front
|
||||
// It's 2019, and I can't believe I am still dealing with time desync as a problem.
|
||||
if err := ensureSyncedGuestClock(h); err != nil {
|
||||
return h, err
|
||||
}
|
||||
} else {
|
||||
showLocalOsRelease()
|
||||
}
|
||||
|
|
|
@ -76,6 +76,7 @@ func TestCreateHost(t *testing.T) {
|
|||
if exists {
|
||||
t.Fatal("Machine already exists.")
|
||||
}
|
||||
|
||||
_, err := createHost(api, defaultMachineConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating host: %v", err)
|
||||
|
@ -215,7 +216,7 @@ func TestStartHostConfig(t *testing.T) {
|
|||
provision.SetDetector(md)
|
||||
|
||||
config := config.MachineConfig{
|
||||
VMDriver: constants.DefaultVMDriver,
|
||||
VMDriver: constants.DriverMock,
|
||||
DockerEnv: []string{"FOO=BAR"},
|
||||
DockerOpt: []string{"param=value"},
|
||||
Downloader: MockDownloader{},
|
||||
|
|
|
@ -62,8 +62,8 @@ func CreateProfile(name string, cfg *Config, miniHome ...string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
glog.Infof("Saving config:\n%s", data)
|
||||
path := profileFilePath(name, miniHome...)
|
||||
glog.Infof("Saving config to %s ...", path)
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -19,8 +19,11 @@ package kubeconfig
|
|||
import (
|
||||
"io/ioutil"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/juju/clock"
|
||||
"github.com/juju/mutex"
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/client-go/tools/clientcmd/api"
|
||||
)
|
||||
|
@ -116,9 +119,17 @@ func PopulateFromSettings(cfg *Settings, apiCfg *api.Config) error {
|
|||
// activeContext is true when minikube is the CurrentContext
|
||||
// If no CurrentContext is set, the given name will be used.
|
||||
func Update(kcs *Settings) error {
|
||||
glog.Infoln("Using kubeconfig: ", kcs.filePath())
|
||||
// Add a lock around both the read, update, and write operations
|
||||
spec := mutex.Spec{Name: "kubeconfigUpdate", Clock: clock.WallClock, Delay: 10 * time.Second}
|
||||
glog.Infof("acquiring lock: %+v", spec)
|
||||
releaser, err := mutex.Acquire(spec)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unable to acquire lock for %+v", spec)
|
||||
}
|
||||
defer releaser.Release()
|
||||
|
||||
// read existing config or create new if does not exist
|
||||
glog.Infoln("Updating kubeconfig: ", kcs.filePath())
|
||||
kcfg, err := readOrNew(kcs.filePath())
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue