it works! and simpler

pull/7973/head
Sharif Elgamal 2020-05-27 09:58:03 -07:00
parent e225968194
commit b5167834d0
7 changed files with 118 additions and 82 deletions

View File

@ -301,7 +301,7 @@ func startWithDriver(starter node.Starter, existing *config.ClusterConfig) (*kub
if existing == nil {
out.Ln("")
warnAboutMultiNode()
}
for i := 1; i < numNodes; i++ {
nodeName := node.Name(i + 1)
n := config.Node{
@ -316,6 +316,16 @@ func startWithDriver(starter node.Starter, existing *config.ClusterConfig) (*kub
return nil, errors.Wrap(err, "adding node")
}
}
} else {
for _, n := range existing.Nodes {
if !n.ControlPlane {
err := node.Add(starter.Cfg, n)
if err != nil {
return nil, errors.Wrap(err, "adding node")
}
}
}
}
}
}

View File

@ -604,14 +604,25 @@ func (k *Bootstrapper) JoinCluster(cc config.ClusterConfig, n config.Node, joinC
glog.Infof("JoinCluster complete in %s", time.Since(start))
}()
if preExists {
return k.restartWorker(cc, n)
}
// Join the master by specifying its token
joinCmd = fmt.Sprintf("%s --node-name=%s", joinCmd, driver.MachineName(cc, n))
join := func() error {
// reset first to clear any possibly existing state
_, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", fmt.Sprintf("%s reset -f", bsutil.InvokeKubeadm(cc.KubernetesConfig.KubernetesVersion))))
if err != nil {
glog.Infof("kubeadm reset failed, continuing anyway: %v", err)
}
out, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", joinCmd))
if err != nil {
return errors.Wrapf(err, "cmd failed: %s\n%+v\n", joinCmd, out)
return errors.Wrapf(err, "cmd failed: %s\n%+v\n", joinCmd, out.Output())
}
return nil
}
if err := retry.Expo(join, 10*time.Second, 1*time.Minute); err != nil {
return errors.Wrap(err, "joining cp")
}
if _, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", "sudo systemctl daemon-reload && sudo systemctl enable kubelet && sudo systemctl start kubelet")); err != nil {
@ -621,54 +632,11 @@ func (k *Bootstrapper) JoinCluster(cc config.ClusterConfig, n config.Node, joinC
return nil
}
func (k *Bootstrapper) restartWorker(cc config.ClusterConfig, n config.Node) error {
if err := k.clearStaleConfigs(cc); err != nil {
return errors.Wrap(err, "clearing stale configs")
}
cmd := fmt.Sprintf("%s join phase kubelet-start %s --token %s --discovery-token-unsafe-skip-ca-verification", bsutil.InvokeKubeadm(cc.KubernetesConfig.KubernetesVersion), net.JoinHostPort(constants.ControlPlaneAlias, strconv.Itoa(constants.APIServerPort)), n.Token)
_, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", cmd))
if err != nil {
if !strings.Contains(err.Error(), "status \"Ready\" already exists in the cluster") {
return errors.Wrap(err, "running join phase kubelet-start")
}
}
// This can fail during upgrades if the old pods have not shut down yet
kubeletStatus := func() error {
st := kverify.KubeletStatus(k.c)
if st != state.Running {
return errors.New("kubelet not running")
}
return nil
}
if err = retry.Expo(kubeletStatus, 100*time.Microsecond, 30*time.Second); err != nil {
glog.Warningf("kubelet is not ready: %v", err)
return errors.Wrap(err, "kubelet")
}
return nil
}
// GenerateToken creates a token and returns the appropriate kubeadm join command to run, or the already existing token
func (k *Bootstrapper) GenerateToken(cc *config.ClusterConfig, n *config.Node) (string, error) {
if n.Token != "" {
return "", nil
}
// Generate the token so we can store it
genTokenCmd := exec.Command("/bin/bash", "-c", fmt.Sprintf("%s token generate", bsutil.InvokeKubeadm(cc.KubernetesConfig.KubernetesVersion)))
r, err := k.c.RunCmd(genTokenCmd)
if err != nil {
return "", errors.Wrap(err, "generating bootstrap token")
}
token := strings.TrimSpace(r.Stdout.String())
n.Token = token
// Take that generated token and use it to get a kubeadm join command
tokenCmd := exec.Command("/bin/bash", "-c", fmt.Sprintf("%s token create %s --print-join-command --ttl=0", bsutil.InvokeKubeadm(cc.KubernetesConfig.KubernetesVersion), token))
r, err = k.c.RunCmd(tokenCmd)
tokenCmd := exec.Command("/bin/bash", "-c", fmt.Sprintf("%s token create --print-join-command --ttl=0", bsutil.InvokeKubeadm(cc.KubernetesConfig.KubernetesVersion)))
r, err := k.c.RunCmd(tokenCmd)
if err != nil {
return "", errors.Wrap(err, "generating join command")
}
@ -676,6 +644,9 @@ func (k *Bootstrapper) GenerateToken(cc *config.ClusterConfig, n *config.Node) (
joinCmd := r.Stdout.String()
joinCmd = strings.Replace(joinCmd, "kubeadm", bsutil.InvokeKubeadm(cc.KubernetesConfig.KubernetesVersion), 1)
joinCmd = fmt.Sprintf("%s --ignore-preflight-errors=all", strings.TrimSpace(joinCmd))
if cc.KubernetesConfig.CRISocket != "" {
joinCmd = fmt.Sprintf("%s --cri-socket %s", joinCmd, cc.KubernetesConfig.CRISocket)
}
// Save the new token for later use
err = config.SaveNode(cc, n)
@ -806,7 +777,7 @@ func (k *Bootstrapper) UpdateNode(cfg config.ClusterConfig, n config.Node, r cru
// Copy the default CNI config (k8s.conf), so that kubelet can successfully
// start a Pod in the case a user hasn't manually installed any CNI plugin
// and minikube was started with "--extra-config=kubelet.network-plugin=cni".
if cfg.KubernetesConfig.EnableDefaultCNI {
if cfg.KubernetesConfig.EnableDefaultCNI && !config.MultiNode(cfg) {
files = append(files, assets.NewMemoryAssetTarget([]byte(defaultCNIConfig), bsutil.DefaultCNIConfigPath, "0644"))
}

View File

@ -112,7 +112,6 @@ type Node struct {
KubernetesVersion string
ControlPlane bool
Worker bool
Token string
}
// VersionedExtraOption holds information on flags to apply to a specific range

View File

@ -136,7 +136,11 @@ func New(c Config) (Manager, error) {
switch c.Type {
case "", "docker":
return &Docker{Socket: c.Socket, Runner: c.Runner, Init: sm}, nil
return &Docker{
Socket: c.Socket,
Runner: c.Runner,
Init: sm,
}, nil
case "crio", "cri-o":
return &CRIO{
Socket: c.Socket,

View File

@ -196,8 +196,8 @@ func CacheAndLoadImages(images []string) error {
status, err := Status(api, m)
if err != nil {
glog.Warningf("error getting status for %s: %v", pName, err)
failed = append(failed, pName)
glog.Warningf("error getting status for %s: %v", m, err)
failed = append(failed, m)
continue
}
@ -205,7 +205,7 @@ func CacheAndLoadImages(images []string) error {
h, err := api.Load(m)
if err != nil {
glog.Warningf("Failed to load machine %q: %v", m, err)
failed = append(failed, pName)
failed = append(failed, m)
continue
}
cr, err := CommandRunner(h)
@ -214,10 +214,10 @@ func CacheAndLoadImages(images []string) error {
}
err = LoadImages(c, cr, images, constants.ImageCacheDir)
if err != nil {
failed = append(failed, pName)
failed = append(failed, m)
glog.Warningf("Failed to load cached images for profile %s. make sure the profile is running. %v", pName, err)
}
succeeded = append(succeeded, pName)
succeeded = append(succeeded, m)
}
}
}

View File

@ -163,18 +163,15 @@ func Start(starter Starter, apiServer bool) (*kubeconfig.Settings, error) {
}
// Make sure to use the command runner for the control plane to generate the join token
var joinCmd string
if !starter.PreExists || starter.Node.Token == "" {
cpBs, err := cluster.ControlPlaneBootstrapper(starter.MachineAPI, starter.Cfg, viper.GetString(cmdcfg.Bootstrapper))
if err != nil {
return nil, errors.Wrap(err, "getting control plane bootstrapper")
}
joinCmd, err = cpBs.GenerateToken(starter.Cfg, starter.Node)
joinCmd, err := cpBs.GenerateToken(starter.Cfg, starter.Node)
if err != nil {
return nil, errors.Wrap(err, "generating join token")
}
}
if err = bs.JoinCluster(*starter.Cfg, *starter.Node, joinCmd, starter.PreExists); err != nil {
return nil, errors.Wrap(err, "joining cluster")

View File

@ -45,6 +45,8 @@ func TestMultiNode(t *testing.T) {
{"AddNode", validateAddNodeToMultiNode},
{"StopNode", validateStopRunningNode},
{"StartAfterStop", validateStartNodeAfterStop},
{"StopMultiNode", validateStopMultiNodeCluster},
{"RestartMultiNode", validateRestartMultiNodeCluster},
{"DeleteNode", validateDeleteNodeFromMultiNode},
}
for _, tc := range tests {
@ -149,8 +151,9 @@ func validateStartNodeAfterStop(ctx context.Context, t *testing.T, profile strin
}
// Start the node back up
rr, err := Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "node", "start", ThirdNodeName))
rr, err := Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "node", "start", ThirdNodeName, "--alsologtostderr"))
if err != nil {
t.Logf(rr.Stderr.String())
t.Errorf("node start returned an error. args %q: %v", rr.Command(), err)
}
@ -175,6 +178,58 @@ func validateStartNodeAfterStop(ctx context.Context, t *testing.T, profile strin
}
}
func validateStopMultiNodeCluster(ctx context.Context, t *testing.T, profile string) {
// Run minikube node stop on that node
rr, err := Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "stop"))
if err != nil {
t.Errorf("node stop returned an error. args %q: %v", rr.Command(), err)
}
// Run status to see the stopped hosts
rr, err = Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "status"))
// Exit code 7 means one host is stopped, which we are expecting
if err != nil && rr.ExitCode != 7 {
t.Fatalf("failed to run minikube status. args %q : %v", rr.Command(), err)
}
// Make sure minikube status shows 3 stopped nodes
rr, err = Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "status", "--alsologtostderr"))
if err != nil && rr.ExitCode != 7 {
t.Fatalf("failed to run minikube status. args %q : %v", rr.Command(), err)
}
if strings.Count(rr.Stdout.String(), "host: Stopped") != 3 {
t.Errorf("incorrect number of stopped hosts: args %q: %v", rr.Command(), rr.Stdout.String())
}
if strings.Count(rr.Stdout.String(), "kubelet: Stopped") != 3 {
t.Errorf("incorrect number of stopped kubelets: args %q: %v", rr.Command(), rr.Stdout.String())
}
}
func validateRestartMultiNodeCluster(ctx context.Context, t *testing.T, profile string) {
// Restart a full cluster with minikube start
startArgs := append([]string{"start", "-p", profile}, StartArgs()...)
rr, err := Run(t, exec.CommandContext(ctx, Target(), startArgs...))
if err != nil {
t.Fatalf("failed to start cluster. args %q : %v", rr.Command(), err)
}
// Make sure minikube status shows 3 running nodes
rr, err = Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "status", "--alsologtostderr"))
if err != nil {
t.Fatalf("failed to run minikube status. args %q : %v", rr.Command(), err)
}
if strings.Count(rr.Stdout.String(), "host: Running") != 3 {
t.Errorf("status says both hosts are not running: args %q: %v", rr.Command(), rr.Stdout.String())
}
if strings.Count(rr.Stdout.String(), "kubelet: Running") != 3 {
t.Errorf("status says both kubelets are not running: args %q: %v", rr.Command(), rr.Stdout.String())
}
}
func validateDeleteNodeFromMultiNode(ctx context.Context, t *testing.T, profile string) {
// Start the node back up