From f3f7899521cc6eefa906dcfb9dd0eb4ab984a99b Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Wed, 5 Mar 2025 08:35:58 +0000 Subject: [PATCH] Move apiserver ready wait into common channel Splits server startup into prepare/start phases. Server's agent is now started after server is prepared, but before it is started. This allows us to properly bootstrap the executor before starting server components, and use the executor to provide a shared channel to wait on apiserver readiness. This allows us to replace four separate callers of WaitForAPIServerReady with reads from a common ready channel. Signed-off-by: Brad Davidson (cherry picked from commit 529e748ac7f0b9aa96a8b14218dd5225d17c5900) Signed-off-by: Brad Davidson --- pkg/agent/run.go | 43 +++++++------ pkg/agent/tunnel/tunnel.go | 27 ++++---- pkg/cli/agent/agent.go | 7 ++- pkg/cli/server/server.go | 53 +++++++++------- pkg/cluster/cluster.go | 15 ++--- pkg/daemons/config/types.go | 8 ++- pkg/daemons/control/server.go | 105 +++++++++---------------------- pkg/daemons/executor/embed.go | 25 +++++--- pkg/daemons/executor/etcd.go | 5 +- pkg/daemons/executor/executor.go | 17 +++-- pkg/server/server.go | 24 +++++-- pkg/util/api.go | 16 +++++ 12 files changed, 184 insertions(+), 161 deletions(-) diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 9623e7df79..221af48dc9 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -174,10 +174,27 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error { return err } - if err := util.WaitForAPIServerReady(ctx, nodeConfig.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil { - return pkgerrors.WithMessage(err, "failed to wait for apiserver ready") - } + go func() { + <-executor.APIServerReadyChan() + if err := startNetwork(ctx, nodeConfig); err != nil { + logrus.Fatalf("Failed to start networking: %v", err) + } + // By default, the server is responsible for notifying systemd + // On agent-only nodes, the agent will notify systemd + if notifySocket != "" { + logrus.Info(version.Program + " agent is up and running") + os.Setenv("NOTIFY_SOCKET", notifySocket) + systemd.SdNotify(true, "READY=1\n") + } + }() + + return nil +} + +// startNetwork updates the network annotations on the node, and starts flannel +// and the kube-router netpol controller, if enabled. +func startNetwork(ctx context.Context, nodeConfig *daemonconfig.Node) error { // Use the kubelet kubeconfig to update annotations on the local node kubeletClient, err := util.GetClientSet(nodeConfig.AgentConfig.KubeConfigKubelet) if err != nil { @@ -200,16 +217,7 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error { } } - // By default, the server is responsible for notifying systemd - // On agent-only nodes, the agent will notify systemd - if notifySocket != "" { - logrus.Info(version.Program + " agent is up and running") - os.Setenv("NOTIFY_SOCKET", notifySocket) - systemd.SdNotify(true, "READY=1\n") - } - - <-ctx.Done() - return ctx.Err() + return nil } // getConntrackConfig uses the kube-proxy code to parse the user-provided kube-proxy-arg values, and @@ -258,8 +266,7 @@ func getConntrackConfig(nodeConfig *daemonconfig.Node) (*kubeproxyconfig.KubePro // RunStandalone bootstraps the executor, but does not run the kubelet or containerd. // This allows other bits of code that expect the executor to be set up properly to function -// even when the agent is disabled. It will only return in case of error or context -// cancellation. +// even when the agent is disabled. func RunStandalone(ctx context.Context, cfg cmds.Agent) error { proxy, err := createProxyAndValidateToken(ctx, &cfg) if err != nil { @@ -298,13 +305,11 @@ func RunStandalone(ctx context.Context, cfg cmds.Agent) error { } } - <-ctx.Done() - return ctx.Err() + return nil } // Run sets up cgroups, configures the LB proxy, and triggers startup -// of containerd and kubelet. It will only return in case of error or context -// cancellation. +// of containerd and kubelet. func Run(ctx context.Context, cfg cmds.Agent) error { if err := cgroups.Validate(); err != nil { return err diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index ae2d1cba09..903b13a49e 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -17,6 +17,7 @@ import ( "github.com/k3s-io/k3s/pkg/agent/proxy" "github.com/k3s-io/k3s/pkg/clientaccess" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/k3s-io/k3s/pkg/daemons/executor" "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" "github.com/rancher/remotedialer" @@ -94,11 +95,9 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er startTime: time.Now().Truncate(time.Second), } - apiServerReady := make(chan struct{}) + rbacReady := make(chan struct{}) go func() { - if err := util.WaitForAPIServerReady(ctx, config.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil { - logrus.Fatalf("Tunnel watches failed to wait for apiserver ready: %v", err) - } + <-executor.APIServerReadyChan() if err := util.WaitForRBACReady(ctx, config.AgentConfig.KubeConfigK3sController, util.DefaultAPIServerReadyTimeout, authorizationv1.ResourceAttributes{ Namespace: metav1.NamespaceDefault, Verb: "list", @@ -107,14 +106,14 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er logrus.Fatalf("Tunnel watches failed to wait for RBAC: %v", err) } - close(apiServerReady) + close(rbacReady) }() // We don't need to run the tunnel authorizer if the container runtime endpoint is /dev/null, // signifying that this is an agentless server that will not register a node. if config.ContainerRuntimeEndpoint != "/dev/null" { // Allow the kubelet port, as published via our node object. - go tunnel.setKubeletPort(ctx, apiServerReady) + go tunnel.setKubeletPort(ctx, rbacReady) switch tunnel.mode { case daemonconfig.EgressSelectorModeCluster: @@ -122,7 +121,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er tunnel.clusterAuth(config) case daemonconfig.EgressSelectorModePod: // In Pod mode, we watch pods assigned to this node, and allow their addresses, as well as ports used by containers with host network. - go tunnel.watchPods(ctx, apiServerReady, config) + go tunnel.watchPods(ctx, rbacReady, config) } } @@ -165,7 +164,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er wg := &sync.WaitGroup{} - go tunnel.watchEndpoints(ctx, apiServerReady, wg, tlsConfig, config, proxy) + go tunnel.watchEndpoints(ctx, rbacReady, wg, tlsConfig, config, proxy) wait := make(chan int, 1) go func() { @@ -184,8 +183,8 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er } // setKubeletPort retrieves the configured kubelet port from our node object -func (a *agentTunnel) setKubeletPort(ctx context.Context, apiServerReady <-chan struct{}) { - <-apiServerReady +func (a *agentTunnel) setKubeletPort(ctx context.Context, rbacReady <-chan struct{}) { + <-rbacReady wait.PollUntilContextTimeout(ctx, time.Second, util.DefaultAPIServerReadyTimeout, true, func(ctx context.Context) (bool, error) { var readyTime metav1.Time @@ -231,7 +230,7 @@ func (a *agentTunnel) clusterAuth(config *daemonconfig.Node) { // watchPods watches for pods assigned to this node, adding their IPs to the CIDR list. // If the pod uses host network, we instead add the -func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struct{}, config *daemonconfig.Node) { +func (a *agentTunnel) watchPods(ctx context.Context, rbacReady <-chan struct{}, config *daemonconfig.Node) { for _, ip := range config.AgentConfig.NodeIPs { if cidr, err := util.IPToIPNet(ip); err == nil { logrus.Infof("Tunnel authorizer adding Node IP %s", cidr) @@ -239,7 +238,7 @@ func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struc } } - <-apiServerReady + <-rbacReady nodeName := os.Getenv("NODE_NAME") pods := a.client.CoreV1().Pods(metav1.NamespaceNone) @@ -308,11 +307,11 @@ func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struc // WatchEndpoints attempts to create tunnels to all supervisor addresses. Once the // apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come // and go from the cluster. -func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, node *daemonconfig.Node, proxy proxy.Proxy) { +func (a *agentTunnel) watchEndpoints(ctx context.Context, rbacReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, node *daemonconfig.Node, proxy proxy.Proxy) { syncProxyAddresses := a.getProxySyncer(ctx, wg, tlsConfig, proxy) refreshFromSupervisor := getAPIServersRequester(node, proxy, syncProxyAddresses) - <-apiServerReady + <-rbacReady endpoints := a.client.CoreV1().Endpoints(metav1.NamespaceDefault) fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String() diff --git a/pkg/cli/agent/agent.go b/pkg/cli/agent/agent.go index 6fc542fadf..0e34a559a9 100644 --- a/pkg/cli/agent/agent.go +++ b/pkg/cli/agent/agent.go @@ -130,5 +130,10 @@ func Run(ctx *cli.Context) error { return https.Start(ctx, nodeConfig, nil) } - return agent.Run(contextCtx, cfg) + if err := agent.Run(contextCtx, cfg); err != nil { + return err + } + + <-contextCtx.Done() + return contextCtx.Err() } diff --git a/pkg/cli/server/server.go b/pkg/cli/server/server.go index f54c49d3f9..59855a552b 100644 --- a/pkg/cli/server/server.go +++ b/pkg/cli/server/server.go @@ -18,6 +18,7 @@ import ( "github.com/k3s-io/k3s/pkg/cli/cmds" "github.com/k3s-io/k3s/pkg/clientaccess" "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/k3s-io/k3s/pkg/daemons/executor" "github.com/k3s-io/k3s/pkg/datadir" "github.com/k3s-io/k3s/pkg/etcd" k3smetrics "github.com/k3s-io/k3s/pkg/metrics" @@ -513,28 +514,10 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont ctx := signals.SetupSignalContext() - if err := server.StartServer(ctx, &serverConfig, cfg); err != nil { + if err := server.PrepareServer(ctx, &serverConfig, cfg); err != nil { return err } - go cmds.WriteCoverage(ctx) - - go func() { - if !serverConfig.ControlConfig.DisableAPIServer { - <-serverConfig.ControlConfig.Runtime.APIServerReady - logrus.Info("Kube API server is now running") - serverConfig.ControlConfig.Runtime.StartupHooksWg.Wait() - } - if !serverConfig.ControlConfig.DisableETCD { - <-serverConfig.ControlConfig.Runtime.ETCDReady - logrus.Info("ETCD server is now running") - } - - logrus.Info(version.Program + " is up and running") - os.Setenv("NOTIFY_SOCKET", notifySocket) - systemd.SdNotify(true, "READY=1\n") - }() - url := fmt.Sprintf("https://%s:%d", serverConfig.ControlConfig.BindAddressOrLoopback(false, true), serverConfig.ControlConfig.SupervisorPort) token, err := clientaccess.FormatToken(serverConfig.ControlConfig.Runtime.AgentToken, serverConfig.ControlConfig.Runtime.ServerCA) if err != nil { @@ -604,10 +587,38 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont if cfg.DisableAgent { agentConfig.ContainerRuntimeEndpoint = "/dev/null" - return agent.RunStandalone(ctx, agentConfig) + if err := agent.RunStandalone(ctx, agentConfig); err != nil { + return err + } + } else { + if err := agent.Run(ctx, agentConfig); err != nil { + return err + } } - return agent.Run(ctx, agentConfig) + go cmds.WriteCoverage(ctx) + + go func() { + if !serverConfig.ControlConfig.DisableETCD { + <-serverConfig.ControlConfig.Runtime.ETCDReady + logrus.Info("ETCD server is now running") + } + if !serverConfig.ControlConfig.DisableAPIServer { + <-executor.APIServerReadyChan() + logrus.Info("Kube API server is now running") + serverConfig.ControlConfig.Runtime.StartupHooksWg.Wait() + } + logrus.Info(version.Program + " is up and running") + os.Setenv("NOTIFY_SOCKET", notifySocket) + systemd.SdNotify(true, "READY=1\n") + }() + + if err := server.StartServer(ctx, &serverConfig, cfg); err != nil { + return err + } + + <-ctx.Done() + return ctx.Err() } // validateNetworkConfig ensures that the network configuration values make sense. diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 67c58b92a8..69bdb97df7 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -27,16 +27,17 @@ type Cluster struct { cnFilterFunc func(...string) []string } -// Start creates the dynamic tls listener, http request handler, -// handles starting and writing/reading bootstrap data, and returns a channel +// ListenAndServe creates the dynamic tls listener, registers http request +// handlers, and starts the supervisor API server loop. +func (c *Cluster) ListenAndServe(ctx context.Context) error { + // Set up the dynamiclistener and http request handlers + return c.initClusterAndHTTPS(ctx) +} + +// Start handles writing/reading bootstrap data, and returns a channel // that will be closed when datastore is ready. If embedded etcd is in use, // a secondary call to Cluster.save is made. func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { - // Set up the dynamiclistener and http request handlers - if err := c.initClusterAndHTTPS(ctx); err != nil { - return nil, pkgerrors.WithMessage(err, "init cluster datastore and https") - } - if c.config.DisableETCD { ready := make(chan struct{}) defer close(ready) diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index fa7618bfd8..c59b7f3e9f 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -255,6 +255,7 @@ type Control struct { SANSecurity bool PrivateIP string Runtime *ControlRuntime `json:"-"` + Cluster Cluster `json:"-"` } // BindAddressOrLoopback returns an IPv4 or IPv6 address suitable for embedding in @@ -312,7 +313,6 @@ type ControlRuntimeBootstrap struct { type ControlRuntime struct { ControlRuntimeBootstrap - APIServerReady <-chan struct{} ContainerRuntimeReady <-chan struct{} ETCDReady <-chan struct{} StartupHooksWg *sync.WaitGroup @@ -381,6 +381,12 @@ type ControlRuntime struct { EtcdConfig endpoint.ETCDConfig } +type Cluster interface { + Bootstrap(ctx context.Context, reset bool) error + ListenAndServe(ctx context.Context) error + Start(ctx context.Context) (<-chan struct{}, error) +} + type CoreFactory interface { Core() core.Interface Sync(ctx context.Context) error diff --git a/pkg/daemons/control/server.go b/pkg/daemons/control/server.go index 156fca100a..eddec8faba 100644 --- a/pkg/daemons/control/server.go +++ b/pkg/daemons/control/server.go @@ -36,7 +36,9 @@ import ( _ "k8s.io/component-base/metrics/prometheus/restclient" ) -func Server(ctx context.Context, cfg *config.Control) error { +// Prepare loads bootstrap data from the datastore and sets up the initial +// tunnel server request handler and stub authenticator. +func Prepare(ctx context.Context, cfg *config.Control) error { rand.Seed(time.Now().UTC().UnixNano()) logsapi.ReapplyHandling = logsapi.ReapplyHandlingIgnoreUnchanged @@ -62,6 +64,18 @@ func Server(ctx context.Context, cfg *config.Control) error { } cfg.Runtime.Authenticator = auth + return nil +} + +// Server starts the apiserver and whatever other control-plane components are +// not disabled on this node. +func Server(ctx context.Context, cfg *config.Control) error { + if ready, err := cfg.Cluster.Start(ctx); err != nil { + return pkgerrors.WithMessage(err, "failed to start cluster") + } else { + cfg.Runtime.ETCDReady = ready + } + if !cfg.DisableAPIServer { go waitForAPIServerHandlers(ctx, cfg.Runtime) @@ -70,12 +84,6 @@ func Server(ctx context.Context, cfg *config.Control) error { } } - // Wait for an apiserver to become available before starting additional controllers, - // even if we're not running an apiserver locally. - if err := waitForAPIServerInBackground(ctx, cfg.Runtime); err != nil { - return err - } - if !cfg.DisableScheduler { if err := scheduler(ctx, cfg); err != nil { return err @@ -139,7 +147,7 @@ func controllerManager(ctx context.Context, cfg *config.Control) error { args := config.GetArgs(argsMap, cfg.ExtraControllerArgs) logrus.Infof("Running kube-controller-manager %s", config.ArgString(args)) - return executor.ControllerManager(ctx, cfg.Runtime.APIServerReady, args) + return executor.ControllerManager(ctx, args) } func scheduler(ctx context.Context, cfg *config.Control) error { @@ -165,22 +173,12 @@ func scheduler(ctx context.Context, cfg *config.Control) error { args := config.GetArgs(argsMap, cfg.ExtraSchedulerAPIArgs) - schedulerNodeReady := make(chan struct{}) + nodeReady := make(chan struct{}) go func() { - defer close(schedulerNodeReady) + defer close(nodeReady) - apiReadyLoop: - for { - select { - case <-ctx.Done(): - return - case <-cfg.Runtime.APIServerReady: - break apiReadyLoop - case <-time.After(30 * time.Second): - logrus.Infof("Waiting for API server to become available to start kube-scheduler") - } - } + <-executor.APIServerReadyChan() // If we're running the embedded cloud controller, wait for it to untaint at least one // node (usually, the local node) before starting the scheduler to ensure that it @@ -194,7 +192,7 @@ func scheduler(ctx context.Context, cfg *config.Control) error { }() logrus.Infof("Running kube-scheduler %s", config.ArgString(args)) - return executor.Scheduler(ctx, schedulerNodeReady, args) + return executor.Scheduler(ctx, nodeReady, args) } func apiServer(ctx context.Context, cfg *config.Control) error { @@ -287,6 +285,9 @@ func defaults(config *config.Control) { } } +// prepare sets up the server data-dir, calls deps.GenServerDeps to +// set paths, extracts the cluster bootstrap data to the +// configured paths, and starts the supervisor listener. func prepare(ctx context.Context, config *config.Control) error { defaults(config) @@ -306,8 +307,8 @@ func prepare(ctx context.Context, config *config.Control) error { deps.CreateRuntimeCertFiles(config) - cluster := cluster.New(config) - if err := cluster.Bootstrap(ctx, config.ClusterReset); err != nil { + config.Cluster = cluster.New(config) + if err := config.Cluster.Bootstrap(ctx, config.ClusterReset); err != nil { return pkgerrors.WithMessage(err, "failed to bootstrap cluster data") } @@ -315,10 +316,8 @@ func prepare(ctx context.Context, config *config.Control) error { return pkgerrors.WithMessage(err, "failed to generate server dependencies") } - if ready, err := cluster.Start(ctx); err != nil { - return pkgerrors.WithMessage(err, "failed to start cluster") - } else { - config.Runtime.ETCDReady = ready + if err := config.Cluster.ListenAndServe(ctx); err != nil { + return pkgerrors.WithMessage(err, "failed to start supervisor listener") } return nil @@ -385,17 +384,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error { go func() { defer close(ccmRBACReady) - apiReadyLoop: - for { - select { - case <-ctx.Done(): - return - case <-cfg.Runtime.APIServerReady: - break apiReadyLoop - case <-time.After(30 * time.Second): - logrus.Infof("Waiting for API server to become available to start cloud-controller-manager") - } - } + <-executor.APIServerReadyChan() logrus.Infof("Waiting for cloud-controller-manager privileges to become available") for { @@ -438,43 +427,6 @@ func waitForAPIServerHandlers(ctx context.Context, runtime *config.ControlRuntim runtime.APIServer = handler } -func waitForAPIServerInBackground(ctx context.Context, runtime *config.ControlRuntime) error { - done := make(chan struct{}) - runtime.APIServerReady = done - - go func() { - defer close(done) - - etcdLoop: - for { - select { - case <-ctx.Done(): - return - case <-runtime.ETCDReady: - break etcdLoop - case <-time.After(30 * time.Second): - logrus.Infof("Waiting for etcd server to become available") - } - } - - logrus.Infof("Waiting for API server to become available") - for { - select { - case <-ctx.Done(): - return - case err := <-promise(func() error { return util.WaitForAPIServerReady(ctx, runtime.KubeConfigSupervisor, 30*time.Second) }): - if err != nil { - logrus.Infof("Waiting for API server to become available") - continue - } - return - } - } - }() - - return nil -} - func promise(f func() error) <-chan error { c := make(chan error, 1) go func() { @@ -487,7 +439,6 @@ func promise(f func() error) <-chan error { // waitForUntaintedNode watches nodes, waiting to find one not tainted as // uninitialized by the external cloud provider. func waitForUntaintedNode(ctx context.Context, kubeConfig string) error { - restConfig, err := util.GetRESTConfig(kubeConfig) if err != nil { return err diff --git a/pkg/daemons/executor/embed.go b/pkg/daemons/executor/embed.go index d036047150..656e7f9fbc 100644 --- a/pkg/daemons/executor/embed.go +++ b/pkg/daemons/executor/embed.go @@ -43,6 +43,7 @@ func init() { } func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error { + e.apiServerReady = util.APIServerReadyChan(ctx, nodeConfig.AgentConfig.KubeConfigK3sController, util.DefaultAPIServerReadyTimeout) e.nodeConfig = nodeConfig go func() { @@ -72,17 +73,12 @@ func (e *Embedded) Kubelet(ctx context.Context, args []string) error { command.SetArgs(args) go func() { + <-e.APIServerReadyChan() defer func() { if err := recover(); err != nil { logrus.WithField("stack", string(debug.Stack())).Fatalf("kubelet panic: %v", err) } }() - // The embedded executor doesn't need the kubelet to come up to host any components, and - // having it come up on servers before the apiserver is available causes a lot of log spew. - // Agents don't have access to the server's apiReady channel, so just wait directly. - if err := util.WaitForAPIServerReady(ctx, e.nodeConfig.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil { - logrus.Fatalf("Kubelet failed to wait for apiserver ready: %v", err) - } err := command.ExecuteContext(ctx) if err != nil && !errors.Is(err, context.Canceled) { logrus.Errorf("kubelet exited: %v", err) @@ -99,6 +95,7 @@ func (e *Embedded) KubeProxy(ctx context.Context, args []string) error { command.SetArgs(daemonconfig.GetArgs(platformKubeProxyArgs(e.nodeConfig), args)) go func() { + <-e.APIServerReadyChan() defer func() { if err := recover(); err != nil { logrus.WithField("stack", string(debug.Stack())).Fatalf("kube-proxy panic: %v", err) @@ -142,12 +139,13 @@ func (*Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args return nil } -func (e *Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error { +func (e *Embedded) Scheduler(ctx context.Context, nodeReady <-chan struct{}, args []string) error { command := sapp.NewSchedulerCommand() command.SetArgs(args) go func() { - <-apiReady + <-e.APIServerReadyChan() + <-nodeReady defer func() { if err := recover(); err != nil { logrus.WithField("stack", string(debug.Stack())).Fatalf("scheduler panic: %v", err) @@ -164,12 +162,12 @@ func (e *Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args return nil } -func (*Embedded) ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error { +func (e *Embedded) ControllerManager(ctx context.Context, args []string) error { command := cmapp.NewControllerManagerCommand() command.SetArgs(args) go func() { - <-apiReady + <-e.APIServerReadyChan() defer func() { if err := recover(); err != nil { logrus.WithField("stack", string(debug.Stack())).Fatalf("controller-manager panic: %v", err) @@ -243,3 +241,10 @@ func (e *Embedded) Containerd(ctx context.Context, cfg *daemonconfig.Node) error func (e *Embedded) Docker(ctx context.Context, cfg *daemonconfig.Node) error { return cridockerd.Run(ctx, cfg) } + +func (e *Embedded) APIServerReadyChan() <-chan struct{} { + if e.apiServerReady == nil { + panic("executor not bootstrapped") + } + return e.apiServerReady +} diff --git a/pkg/daemons/executor/etcd.go b/pkg/daemons/executor/etcd.go index dde8e496bf..4420599e41 100644 --- a/pkg/daemons/executor/etcd.go +++ b/pkg/daemons/executor/etcd.go @@ -13,8 +13,11 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" ) +// Embedded is defined here so that we can use embedded.ETCD even when the rest +// of the embedded execututor is disabled by build flags type Embedded struct { - nodeConfig *daemonconfig.Node + apiServerReady <-chan struct{} + nodeConfig *daemonconfig.Node } func (e *Embedded) ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error { diff --git a/pkg/daemons/executor/executor.go b/pkg/daemons/executor/executor.go index d28b5e062b..eb028449a5 100644 --- a/pkg/daemons/executor/executor.go +++ b/pkg/daemons/executor/executor.go @@ -26,13 +26,14 @@ type Executor interface { KubeProxy(ctx context.Context, args []string) error APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error - Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error - ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error + Scheduler(ctx context.Context, nodeReady <-chan struct{}, args []string) error + ControllerManager(ctx context.Context, args []string) error CurrentETCDOptions() (InitialOptions, error) ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error Containerd(ctx context.Context, node *daemonconfig.Node) error Docker(ctx context.Context, node *daemonconfig.Node) error + APIServerReadyChan() <-chan struct{} } type ETCDConfig struct { @@ -154,12 +155,12 @@ func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) er return executor.APIServer(ctx, etcdReady, args) } -func Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error { - return executor.Scheduler(ctx, apiReady, args) +func Scheduler(ctx context.Context, nodeReady <-chan struct{}, args []string) error { + return executor.Scheduler(ctx, nodeReady, args) } -func ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error { - return executor.ControllerManager(ctx, apiReady, args) +func ControllerManager(ctx context.Context, args []string) error { + return executor.ControllerManager(ctx, args) } func CurrentETCDOptions() (InitialOptions, error) { @@ -181,3 +182,7 @@ func Containerd(ctx context.Context, config *daemonconfig.Node) error { func Docker(ctx context.Context, config *daemonconfig.Node) error { return executor.Docker(ctx, config) } + +func APIServerReadyChan() <-chan struct{} { + return executor.APIServerReadyChan() +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 7e4e4a04f2..3ce4cf947f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -17,6 +17,7 @@ import ( "github.com/k3s-io/k3s/pkg/clientaccess" "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/daemons/control" + "github.com/k3s-io/k3s/pkg/daemons/executor" "github.com/k3s-io/k3s/pkg/datadir" "github.com/k3s-io/k3s/pkg/deploy" "github.com/k3s-io/k3s/pkg/node" @@ -44,7 +45,10 @@ func ResolveDataDir(dataDir string) (string, error) { return filepath.Join(dataDir, "server"), err } -func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error { +// PrepareServer prepares the server for operation. This includes setting paths +// in ControlConfig, creating any certificates not extracted from the bootstrap +// data, and binding request handlers. +func PrepareServer(ctx context.Context, config *Config, cfg *cmds.Server) error { if err := setupDataDirAndChdir(&config.ControlConfig); err != nil { return err } @@ -53,6 +57,19 @@ func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error { return err } + if err := control.Prepare(ctx, &config.ControlConfig); err != nil { + return err + } + + config.ControlConfig.Runtime.Handler = handlers.NewHandler(ctx, &config.ControlConfig, cfg) + + return nil +} + +// StartServer starts whatever control-plane and etcd components are enabled by +// the current server configuration, runs startup hooks, starts controllers, +// and writes the admin kubeconfig. +func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error { if err := control.Server(ctx, &config.ControlConfig); err != nil { return pkgerrors.WithMessage(err, "starting kubernetes") } @@ -60,11 +77,10 @@ func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error { wg := &sync.WaitGroup{} wg.Add(len(config.StartupHooks)) - config.ControlConfig.Runtime.Handler = handlers.NewHandler(ctx, &config.ControlConfig, cfg) config.ControlConfig.Runtime.StartupHooksWg = wg shArgs := cmds.StartupHookArgs{ - APIServerReady: config.ControlConfig.Runtime.APIServerReady, + APIServerReady: executor.APIServerReadyChan(), KubeConfigSupervisor: config.ControlConfig.Runtime.KubeConfigSupervisor, Skips: config.ControlConfig.Skips, Disables: config.ControlConfig.Disables, @@ -87,7 +103,7 @@ func startOnAPIServerReady(ctx context.Context, config *Config) { select { case <-ctx.Done(): return - case <-config.ControlConfig.Runtime.APIServerReady: + case <-executor.APIServerReadyChan(): if err := runControllers(ctx, config); err != nil { logrus.Fatalf("failed to start controllers: %v", err) } diff --git a/pkg/util/api.go b/pkg/util/api.go index 0fdbc80197..14bb61734d 100644 --- a/pkg/util/api.go +++ b/pkg/util/api.go @@ -104,6 +104,22 @@ func WaitForAPIServerReady(ctx context.Context, kubeconfigPath string, timeout t return nil } +// APIServerReadyChan wraps WaitForAPIServerReady, returning a channel that +// is closed when the apiserver is ready. If the apiserver does not become +// ready within the expected duration, a fatal error is raised. +func APIServerReadyChan(ctx context.Context, kubeConfig string, timeout time.Duration) <-chan struct{} { + ready := make(chan struct{}) + + go func() { + defer close(ready) + if err := WaitForAPIServerReady(ctx, kubeConfig, timeout); err != nil { + logrus.Fatalf("Failed to wait for API server to become ready: %v", err) + } + }() + + return ready +} + type genericAccessReviewRequest func(context.Context) (*authorizationv1.SubjectAccessReviewStatus, error) // WaitForRBACReady polls an AccessReview request until it returns an allowed response. If the user