mirror of https://github.com/k3s-io/k3s.git
Move apiserver ready wait into common channel
Splits server startup into prepare/start phases. Server's agent is now
started after server is prepared, but before it is started. This allows
us to properly bootstrap the executor before starting server components,
and use the executor to provide a shared channel to wait on apiserver
readiness.
This allows us to replace four separate callers of WaitForAPIServerReady
with reads from a common ready channel.
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 529e748ac7
)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/12044/head
parent
9989f5e69a
commit
f3f7899521
|
@ -174,10 +174,27 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := util.WaitForAPIServerReady(ctx, nodeConfig.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil {
|
||||
return pkgerrors.WithMessage(err, "failed to wait for apiserver ready")
|
||||
}
|
||||
go func() {
|
||||
<-executor.APIServerReadyChan()
|
||||
if err := startNetwork(ctx, nodeConfig); err != nil {
|
||||
logrus.Fatalf("Failed to start networking: %v", err)
|
||||
}
|
||||
|
||||
// By default, the server is responsible for notifying systemd
|
||||
// On agent-only nodes, the agent will notify systemd
|
||||
if notifySocket != "" {
|
||||
logrus.Info(version.Program + " agent is up and running")
|
||||
os.Setenv("NOTIFY_SOCKET", notifySocket)
|
||||
systemd.SdNotify(true, "READY=1\n")
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// startNetwork updates the network annotations on the node, and starts flannel
|
||||
// and the kube-router netpol controller, if enabled.
|
||||
func startNetwork(ctx context.Context, nodeConfig *daemonconfig.Node) error {
|
||||
// Use the kubelet kubeconfig to update annotations on the local node
|
||||
kubeletClient, err := util.GetClientSet(nodeConfig.AgentConfig.KubeConfigKubelet)
|
||||
if err != nil {
|
||||
|
@ -200,16 +217,7 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
|
|||
}
|
||||
}
|
||||
|
||||
// By default, the server is responsible for notifying systemd
|
||||
// On agent-only nodes, the agent will notify systemd
|
||||
if notifySocket != "" {
|
||||
logrus.Info(version.Program + " agent is up and running")
|
||||
os.Setenv("NOTIFY_SOCKET", notifySocket)
|
||||
systemd.SdNotify(true, "READY=1\n")
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
return nil
|
||||
}
|
||||
|
||||
// getConntrackConfig uses the kube-proxy code to parse the user-provided kube-proxy-arg values, and
|
||||
|
@ -258,8 +266,7 @@ func getConntrackConfig(nodeConfig *daemonconfig.Node) (*kubeproxyconfig.KubePro
|
|||
|
||||
// RunStandalone bootstraps the executor, but does not run the kubelet or containerd.
|
||||
// This allows other bits of code that expect the executor to be set up properly to function
|
||||
// even when the agent is disabled. It will only return in case of error or context
|
||||
// cancellation.
|
||||
// even when the agent is disabled.
|
||||
func RunStandalone(ctx context.Context, cfg cmds.Agent) error {
|
||||
proxy, err := createProxyAndValidateToken(ctx, &cfg)
|
||||
if err != nil {
|
||||
|
@ -298,13 +305,11 @@ func RunStandalone(ctx context.Context, cfg cmds.Agent) error {
|
|||
}
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run sets up cgroups, configures the LB proxy, and triggers startup
|
||||
// of containerd and kubelet. It will only return in case of error or context
|
||||
// cancellation.
|
||||
// of containerd and kubelet.
|
||||
func Run(ctx context.Context, cfg cmds.Agent) error {
|
||||
if err := cgroups.Validate(); err != nil {
|
||||
return err
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/k3s-io/k3s/pkg/agent/proxy"
|
||||
"github.com/k3s-io/k3s/pkg/clientaccess"
|
||||
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/executor"
|
||||
"github.com/k3s-io/k3s/pkg/util"
|
||||
"github.com/k3s-io/k3s/pkg/version"
|
||||
"github.com/rancher/remotedialer"
|
||||
|
@ -94,11 +95,9 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
|
|||
startTime: time.Now().Truncate(time.Second),
|
||||
}
|
||||
|
||||
apiServerReady := make(chan struct{})
|
||||
rbacReady := make(chan struct{})
|
||||
go func() {
|
||||
if err := util.WaitForAPIServerReady(ctx, config.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil {
|
||||
logrus.Fatalf("Tunnel watches failed to wait for apiserver ready: %v", err)
|
||||
}
|
||||
<-executor.APIServerReadyChan()
|
||||
if err := util.WaitForRBACReady(ctx, config.AgentConfig.KubeConfigK3sController, util.DefaultAPIServerReadyTimeout, authorizationv1.ResourceAttributes{
|
||||
Namespace: metav1.NamespaceDefault,
|
||||
Verb: "list",
|
||||
|
@ -107,14 +106,14 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
|
|||
logrus.Fatalf("Tunnel watches failed to wait for RBAC: %v", err)
|
||||
}
|
||||
|
||||
close(apiServerReady)
|
||||
close(rbacReady)
|
||||
}()
|
||||
|
||||
// We don't need to run the tunnel authorizer if the container runtime endpoint is /dev/null,
|
||||
// signifying that this is an agentless server that will not register a node.
|
||||
if config.ContainerRuntimeEndpoint != "/dev/null" {
|
||||
// Allow the kubelet port, as published via our node object.
|
||||
go tunnel.setKubeletPort(ctx, apiServerReady)
|
||||
go tunnel.setKubeletPort(ctx, rbacReady)
|
||||
|
||||
switch tunnel.mode {
|
||||
case daemonconfig.EgressSelectorModeCluster:
|
||||
|
@ -122,7 +121,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
|
|||
tunnel.clusterAuth(config)
|
||||
case daemonconfig.EgressSelectorModePod:
|
||||
// In Pod mode, we watch pods assigned to this node, and allow their addresses, as well as ports used by containers with host network.
|
||||
go tunnel.watchPods(ctx, apiServerReady, config)
|
||||
go tunnel.watchPods(ctx, rbacReady, config)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -165,7 +164,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
|
|||
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
go tunnel.watchEndpoints(ctx, apiServerReady, wg, tlsConfig, config, proxy)
|
||||
go tunnel.watchEndpoints(ctx, rbacReady, wg, tlsConfig, config, proxy)
|
||||
|
||||
wait := make(chan int, 1)
|
||||
go func() {
|
||||
|
@ -184,8 +183,8 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
|
|||
}
|
||||
|
||||
// setKubeletPort retrieves the configured kubelet port from our node object
|
||||
func (a *agentTunnel) setKubeletPort(ctx context.Context, apiServerReady <-chan struct{}) {
|
||||
<-apiServerReady
|
||||
func (a *agentTunnel) setKubeletPort(ctx context.Context, rbacReady <-chan struct{}) {
|
||||
<-rbacReady
|
||||
|
||||
wait.PollUntilContextTimeout(ctx, time.Second, util.DefaultAPIServerReadyTimeout, true, func(ctx context.Context) (bool, error) {
|
||||
var readyTime metav1.Time
|
||||
|
@ -231,7 +230,7 @@ func (a *agentTunnel) clusterAuth(config *daemonconfig.Node) {
|
|||
|
||||
// watchPods watches for pods assigned to this node, adding their IPs to the CIDR list.
|
||||
// If the pod uses host network, we instead add the
|
||||
func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struct{}, config *daemonconfig.Node) {
|
||||
func (a *agentTunnel) watchPods(ctx context.Context, rbacReady <-chan struct{}, config *daemonconfig.Node) {
|
||||
for _, ip := range config.AgentConfig.NodeIPs {
|
||||
if cidr, err := util.IPToIPNet(ip); err == nil {
|
||||
logrus.Infof("Tunnel authorizer adding Node IP %s", cidr)
|
||||
|
@ -239,7 +238,7 @@ func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struc
|
|||
}
|
||||
}
|
||||
|
||||
<-apiServerReady
|
||||
<-rbacReady
|
||||
|
||||
nodeName := os.Getenv("NODE_NAME")
|
||||
pods := a.client.CoreV1().Pods(metav1.NamespaceNone)
|
||||
|
@ -308,11 +307,11 @@ func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struc
|
|||
// WatchEndpoints attempts to create tunnels to all supervisor addresses. Once the
|
||||
// apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come
|
||||
// and go from the cluster.
|
||||
func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, node *daemonconfig.Node, proxy proxy.Proxy) {
|
||||
func (a *agentTunnel) watchEndpoints(ctx context.Context, rbacReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, node *daemonconfig.Node, proxy proxy.Proxy) {
|
||||
syncProxyAddresses := a.getProxySyncer(ctx, wg, tlsConfig, proxy)
|
||||
refreshFromSupervisor := getAPIServersRequester(node, proxy, syncProxyAddresses)
|
||||
|
||||
<-apiServerReady
|
||||
<-rbacReady
|
||||
|
||||
endpoints := a.client.CoreV1().Endpoints(metav1.NamespaceDefault)
|
||||
fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String()
|
||||
|
|
|
@ -130,5 +130,10 @@ func Run(ctx *cli.Context) error {
|
|||
return https.Start(ctx, nodeConfig, nil)
|
||||
}
|
||||
|
||||
return agent.Run(contextCtx, cfg)
|
||||
if err := agent.Run(contextCtx, cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
<-contextCtx.Done()
|
||||
return contextCtx.Err()
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/k3s-io/k3s/pkg/cli/cmds"
|
||||
"github.com/k3s-io/k3s/pkg/clientaccess"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/executor"
|
||||
"github.com/k3s-io/k3s/pkg/datadir"
|
||||
"github.com/k3s-io/k3s/pkg/etcd"
|
||||
k3smetrics "github.com/k3s-io/k3s/pkg/metrics"
|
||||
|
@ -513,28 +514,10 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
|
|||
|
||||
ctx := signals.SetupSignalContext()
|
||||
|
||||
if err := server.StartServer(ctx, &serverConfig, cfg); err != nil {
|
||||
if err := server.PrepareServer(ctx, &serverConfig, cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go cmds.WriteCoverage(ctx)
|
||||
|
||||
go func() {
|
||||
if !serverConfig.ControlConfig.DisableAPIServer {
|
||||
<-serverConfig.ControlConfig.Runtime.APIServerReady
|
||||
logrus.Info("Kube API server is now running")
|
||||
serverConfig.ControlConfig.Runtime.StartupHooksWg.Wait()
|
||||
}
|
||||
if !serverConfig.ControlConfig.DisableETCD {
|
||||
<-serverConfig.ControlConfig.Runtime.ETCDReady
|
||||
logrus.Info("ETCD server is now running")
|
||||
}
|
||||
|
||||
logrus.Info(version.Program + " is up and running")
|
||||
os.Setenv("NOTIFY_SOCKET", notifySocket)
|
||||
systemd.SdNotify(true, "READY=1\n")
|
||||
}()
|
||||
|
||||
url := fmt.Sprintf("https://%s:%d", serverConfig.ControlConfig.BindAddressOrLoopback(false, true), serverConfig.ControlConfig.SupervisorPort)
|
||||
token, err := clientaccess.FormatToken(serverConfig.ControlConfig.Runtime.AgentToken, serverConfig.ControlConfig.Runtime.ServerCA)
|
||||
if err != nil {
|
||||
|
@ -604,10 +587,38 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
|
|||
|
||||
if cfg.DisableAgent {
|
||||
agentConfig.ContainerRuntimeEndpoint = "/dev/null"
|
||||
return agent.RunStandalone(ctx, agentConfig)
|
||||
if err := agent.RunStandalone(ctx, agentConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := agent.Run(ctx, agentConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return agent.Run(ctx, agentConfig)
|
||||
go cmds.WriteCoverage(ctx)
|
||||
|
||||
go func() {
|
||||
if !serverConfig.ControlConfig.DisableETCD {
|
||||
<-serverConfig.ControlConfig.Runtime.ETCDReady
|
||||
logrus.Info("ETCD server is now running")
|
||||
}
|
||||
if !serverConfig.ControlConfig.DisableAPIServer {
|
||||
<-executor.APIServerReadyChan()
|
||||
logrus.Info("Kube API server is now running")
|
||||
serverConfig.ControlConfig.Runtime.StartupHooksWg.Wait()
|
||||
}
|
||||
logrus.Info(version.Program + " is up and running")
|
||||
os.Setenv("NOTIFY_SOCKET", notifySocket)
|
||||
systemd.SdNotify(true, "READY=1\n")
|
||||
}()
|
||||
|
||||
if err := server.StartServer(ctx, &serverConfig, cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
// validateNetworkConfig ensures that the network configuration values make sense.
|
||||
|
|
|
@ -27,16 +27,17 @@ type Cluster struct {
|
|||
cnFilterFunc func(...string) []string
|
||||
}
|
||||
|
||||
// Start creates the dynamic tls listener, http request handler,
|
||||
// handles starting and writing/reading bootstrap data, and returns a channel
|
||||
// ListenAndServe creates the dynamic tls listener, registers http request
|
||||
// handlers, and starts the supervisor API server loop.
|
||||
func (c *Cluster) ListenAndServe(ctx context.Context) error {
|
||||
// Set up the dynamiclistener and http request handlers
|
||||
return c.initClusterAndHTTPS(ctx)
|
||||
}
|
||||
|
||||
// Start handles writing/reading bootstrap data, and returns a channel
|
||||
// that will be closed when datastore is ready. If embedded etcd is in use,
|
||||
// a secondary call to Cluster.save is made.
|
||||
func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
|
||||
// Set up the dynamiclistener and http request handlers
|
||||
if err := c.initClusterAndHTTPS(ctx); err != nil {
|
||||
return nil, pkgerrors.WithMessage(err, "init cluster datastore and https")
|
||||
}
|
||||
|
||||
if c.config.DisableETCD {
|
||||
ready := make(chan struct{})
|
||||
defer close(ready)
|
||||
|
|
|
@ -255,6 +255,7 @@ type Control struct {
|
|||
SANSecurity bool
|
||||
PrivateIP string
|
||||
Runtime *ControlRuntime `json:"-"`
|
||||
Cluster Cluster `json:"-"`
|
||||
}
|
||||
|
||||
// BindAddressOrLoopback returns an IPv4 or IPv6 address suitable for embedding in
|
||||
|
@ -312,7 +313,6 @@ type ControlRuntimeBootstrap struct {
|
|||
type ControlRuntime struct {
|
||||
ControlRuntimeBootstrap
|
||||
|
||||
APIServerReady <-chan struct{}
|
||||
ContainerRuntimeReady <-chan struct{}
|
||||
ETCDReady <-chan struct{}
|
||||
StartupHooksWg *sync.WaitGroup
|
||||
|
@ -381,6 +381,12 @@ type ControlRuntime struct {
|
|||
EtcdConfig endpoint.ETCDConfig
|
||||
}
|
||||
|
||||
type Cluster interface {
|
||||
Bootstrap(ctx context.Context, reset bool) error
|
||||
ListenAndServe(ctx context.Context) error
|
||||
Start(ctx context.Context) (<-chan struct{}, error)
|
||||
}
|
||||
|
||||
type CoreFactory interface {
|
||||
Core() core.Interface
|
||||
Sync(ctx context.Context) error
|
||||
|
|
|
@ -36,7 +36,9 @@ import (
|
|||
_ "k8s.io/component-base/metrics/prometheus/restclient"
|
||||
)
|
||||
|
||||
func Server(ctx context.Context, cfg *config.Control) error {
|
||||
// Prepare loads bootstrap data from the datastore and sets up the initial
|
||||
// tunnel server request handler and stub authenticator.
|
||||
func Prepare(ctx context.Context, cfg *config.Control) error {
|
||||
rand.Seed(time.Now().UTC().UnixNano())
|
||||
|
||||
logsapi.ReapplyHandling = logsapi.ReapplyHandlingIgnoreUnchanged
|
||||
|
@ -62,6 +64,18 @@ func Server(ctx context.Context, cfg *config.Control) error {
|
|||
}
|
||||
cfg.Runtime.Authenticator = auth
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Server starts the apiserver and whatever other control-plane components are
|
||||
// not disabled on this node.
|
||||
func Server(ctx context.Context, cfg *config.Control) error {
|
||||
if ready, err := cfg.Cluster.Start(ctx); err != nil {
|
||||
return pkgerrors.WithMessage(err, "failed to start cluster")
|
||||
} else {
|
||||
cfg.Runtime.ETCDReady = ready
|
||||
}
|
||||
|
||||
if !cfg.DisableAPIServer {
|
||||
go waitForAPIServerHandlers(ctx, cfg.Runtime)
|
||||
|
||||
|
@ -70,12 +84,6 @@ func Server(ctx context.Context, cfg *config.Control) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Wait for an apiserver to become available before starting additional controllers,
|
||||
// even if we're not running an apiserver locally.
|
||||
if err := waitForAPIServerInBackground(ctx, cfg.Runtime); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !cfg.DisableScheduler {
|
||||
if err := scheduler(ctx, cfg); err != nil {
|
||||
return err
|
||||
|
@ -139,7 +147,7 @@ func controllerManager(ctx context.Context, cfg *config.Control) error {
|
|||
args := config.GetArgs(argsMap, cfg.ExtraControllerArgs)
|
||||
logrus.Infof("Running kube-controller-manager %s", config.ArgString(args))
|
||||
|
||||
return executor.ControllerManager(ctx, cfg.Runtime.APIServerReady, args)
|
||||
return executor.ControllerManager(ctx, args)
|
||||
}
|
||||
|
||||
func scheduler(ctx context.Context, cfg *config.Control) error {
|
||||
|
@ -165,22 +173,12 @@ func scheduler(ctx context.Context, cfg *config.Control) error {
|
|||
|
||||
args := config.GetArgs(argsMap, cfg.ExtraSchedulerAPIArgs)
|
||||
|
||||
schedulerNodeReady := make(chan struct{})
|
||||
nodeReady := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
defer close(schedulerNodeReady)
|
||||
defer close(nodeReady)
|
||||
|
||||
apiReadyLoop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-cfg.Runtime.APIServerReady:
|
||||
break apiReadyLoop
|
||||
case <-time.After(30 * time.Second):
|
||||
logrus.Infof("Waiting for API server to become available to start kube-scheduler")
|
||||
}
|
||||
}
|
||||
<-executor.APIServerReadyChan()
|
||||
|
||||
// If we're running the embedded cloud controller, wait for it to untaint at least one
|
||||
// node (usually, the local node) before starting the scheduler to ensure that it
|
||||
|
@ -194,7 +192,7 @@ func scheduler(ctx context.Context, cfg *config.Control) error {
|
|||
}()
|
||||
|
||||
logrus.Infof("Running kube-scheduler %s", config.ArgString(args))
|
||||
return executor.Scheduler(ctx, schedulerNodeReady, args)
|
||||
return executor.Scheduler(ctx, nodeReady, args)
|
||||
}
|
||||
|
||||
func apiServer(ctx context.Context, cfg *config.Control) error {
|
||||
|
@ -287,6 +285,9 @@ func defaults(config *config.Control) {
|
|||
}
|
||||
}
|
||||
|
||||
// prepare sets up the server data-dir, calls deps.GenServerDeps to
|
||||
// set paths, extracts the cluster bootstrap data to the
|
||||
// configured paths, and starts the supervisor listener.
|
||||
func prepare(ctx context.Context, config *config.Control) error {
|
||||
defaults(config)
|
||||
|
||||
|
@ -306,8 +307,8 @@ func prepare(ctx context.Context, config *config.Control) error {
|
|||
|
||||
deps.CreateRuntimeCertFiles(config)
|
||||
|
||||
cluster := cluster.New(config)
|
||||
if err := cluster.Bootstrap(ctx, config.ClusterReset); err != nil {
|
||||
config.Cluster = cluster.New(config)
|
||||
if err := config.Cluster.Bootstrap(ctx, config.ClusterReset); err != nil {
|
||||
return pkgerrors.WithMessage(err, "failed to bootstrap cluster data")
|
||||
}
|
||||
|
||||
|
@ -315,10 +316,8 @@ func prepare(ctx context.Context, config *config.Control) error {
|
|||
return pkgerrors.WithMessage(err, "failed to generate server dependencies")
|
||||
}
|
||||
|
||||
if ready, err := cluster.Start(ctx); err != nil {
|
||||
return pkgerrors.WithMessage(err, "failed to start cluster")
|
||||
} else {
|
||||
config.Runtime.ETCDReady = ready
|
||||
if err := config.Cluster.ListenAndServe(ctx); err != nil {
|
||||
return pkgerrors.WithMessage(err, "failed to start supervisor listener")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -385,17 +384,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error {
|
|||
go func() {
|
||||
defer close(ccmRBACReady)
|
||||
|
||||
apiReadyLoop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-cfg.Runtime.APIServerReady:
|
||||
break apiReadyLoop
|
||||
case <-time.After(30 * time.Second):
|
||||
logrus.Infof("Waiting for API server to become available to start cloud-controller-manager")
|
||||
}
|
||||
}
|
||||
<-executor.APIServerReadyChan()
|
||||
|
||||
logrus.Infof("Waiting for cloud-controller-manager privileges to become available")
|
||||
for {
|
||||
|
@ -438,43 +427,6 @@ func waitForAPIServerHandlers(ctx context.Context, runtime *config.ControlRuntim
|
|||
runtime.APIServer = handler
|
||||
}
|
||||
|
||||
func waitForAPIServerInBackground(ctx context.Context, runtime *config.ControlRuntime) error {
|
||||
done := make(chan struct{})
|
||||
runtime.APIServerReady = done
|
||||
|
||||
go func() {
|
||||
defer close(done)
|
||||
|
||||
etcdLoop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-runtime.ETCDReady:
|
||||
break etcdLoop
|
||||
case <-time.After(30 * time.Second):
|
||||
logrus.Infof("Waiting for etcd server to become available")
|
||||
}
|
||||
}
|
||||
|
||||
logrus.Infof("Waiting for API server to become available")
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case err := <-promise(func() error { return util.WaitForAPIServerReady(ctx, runtime.KubeConfigSupervisor, 30*time.Second) }):
|
||||
if err != nil {
|
||||
logrus.Infof("Waiting for API server to become available")
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func promise(f func() error) <-chan error {
|
||||
c := make(chan error, 1)
|
||||
go func() {
|
||||
|
@ -487,7 +439,6 @@ func promise(f func() error) <-chan error {
|
|||
// waitForUntaintedNode watches nodes, waiting to find one not tainted as
|
||||
// uninitialized by the external cloud provider.
|
||||
func waitForUntaintedNode(ctx context.Context, kubeConfig string) error {
|
||||
|
||||
restConfig, err := util.GetRESTConfig(kubeConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -43,6 +43,7 @@ func init() {
|
|||
}
|
||||
|
||||
func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
|
||||
e.apiServerReady = util.APIServerReadyChan(ctx, nodeConfig.AgentConfig.KubeConfigK3sController, util.DefaultAPIServerReadyTimeout)
|
||||
e.nodeConfig = nodeConfig
|
||||
|
||||
go func() {
|
||||
|
@ -72,17 +73,12 @@ func (e *Embedded) Kubelet(ctx context.Context, args []string) error {
|
|||
command.SetArgs(args)
|
||||
|
||||
go func() {
|
||||
<-e.APIServerReadyChan()
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logrus.WithField("stack", string(debug.Stack())).Fatalf("kubelet panic: %v", err)
|
||||
}
|
||||
}()
|
||||
// The embedded executor doesn't need the kubelet to come up to host any components, and
|
||||
// having it come up on servers before the apiserver is available causes a lot of log spew.
|
||||
// Agents don't have access to the server's apiReady channel, so just wait directly.
|
||||
if err := util.WaitForAPIServerReady(ctx, e.nodeConfig.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil {
|
||||
logrus.Fatalf("Kubelet failed to wait for apiserver ready: %v", err)
|
||||
}
|
||||
err := command.ExecuteContext(ctx)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
logrus.Errorf("kubelet exited: %v", err)
|
||||
|
@ -99,6 +95,7 @@ func (e *Embedded) KubeProxy(ctx context.Context, args []string) error {
|
|||
command.SetArgs(daemonconfig.GetArgs(platformKubeProxyArgs(e.nodeConfig), args))
|
||||
|
||||
go func() {
|
||||
<-e.APIServerReadyChan()
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logrus.WithField("stack", string(debug.Stack())).Fatalf("kube-proxy panic: %v", err)
|
||||
|
@ -142,12 +139,13 @@ func (*Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error {
|
||||
func (e *Embedded) Scheduler(ctx context.Context, nodeReady <-chan struct{}, args []string) error {
|
||||
command := sapp.NewSchedulerCommand()
|
||||
command.SetArgs(args)
|
||||
|
||||
go func() {
|
||||
<-apiReady
|
||||
<-e.APIServerReadyChan()
|
||||
<-nodeReady
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logrus.WithField("stack", string(debug.Stack())).Fatalf("scheduler panic: %v", err)
|
||||
|
@ -164,12 +162,12 @@ func (e *Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args
|
|||
return nil
|
||||
}
|
||||
|
||||
func (*Embedded) ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error {
|
||||
func (e *Embedded) ControllerManager(ctx context.Context, args []string) error {
|
||||
command := cmapp.NewControllerManagerCommand()
|
||||
command.SetArgs(args)
|
||||
|
||||
go func() {
|
||||
<-apiReady
|
||||
<-e.APIServerReadyChan()
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logrus.WithField("stack", string(debug.Stack())).Fatalf("controller-manager panic: %v", err)
|
||||
|
@ -243,3 +241,10 @@ func (e *Embedded) Containerd(ctx context.Context, cfg *daemonconfig.Node) error
|
|||
func (e *Embedded) Docker(ctx context.Context, cfg *daemonconfig.Node) error {
|
||||
return cridockerd.Run(ctx, cfg)
|
||||
}
|
||||
|
||||
func (e *Embedded) APIServerReadyChan() <-chan struct{} {
|
||||
if e.apiServerReady == nil {
|
||||
panic("executor not bootstrapped")
|
||||
}
|
||||
return e.apiServerReady
|
||||
}
|
||||
|
|
|
@ -13,8 +13,11 @@ import (
|
|||
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
|
||||
)
|
||||
|
||||
// Embedded is defined here so that we can use embedded.ETCD even when the rest
|
||||
// of the embedded execututor is disabled by build flags
|
||||
type Embedded struct {
|
||||
nodeConfig *daemonconfig.Node
|
||||
apiServerReady <-chan struct{}
|
||||
nodeConfig *daemonconfig.Node
|
||||
}
|
||||
|
||||
func (e *Embedded) ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error {
|
||||
|
|
|
@ -26,13 +26,14 @@ type Executor interface {
|
|||
KubeProxy(ctx context.Context, args []string) error
|
||||
APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error)
|
||||
APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error
|
||||
Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error
|
||||
ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error
|
||||
Scheduler(ctx context.Context, nodeReady <-chan struct{}, args []string) error
|
||||
ControllerManager(ctx context.Context, args []string) error
|
||||
CurrentETCDOptions() (InitialOptions, error)
|
||||
ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error
|
||||
CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error
|
||||
Containerd(ctx context.Context, node *daemonconfig.Node) error
|
||||
Docker(ctx context.Context, node *daemonconfig.Node) error
|
||||
APIServerReadyChan() <-chan struct{}
|
||||
}
|
||||
|
||||
type ETCDConfig struct {
|
||||
|
@ -154,12 +155,12 @@ func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) er
|
|||
return executor.APIServer(ctx, etcdReady, args)
|
||||
}
|
||||
|
||||
func Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error {
|
||||
return executor.Scheduler(ctx, apiReady, args)
|
||||
func Scheduler(ctx context.Context, nodeReady <-chan struct{}, args []string) error {
|
||||
return executor.Scheduler(ctx, nodeReady, args)
|
||||
}
|
||||
|
||||
func ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error {
|
||||
return executor.ControllerManager(ctx, apiReady, args)
|
||||
func ControllerManager(ctx context.Context, args []string) error {
|
||||
return executor.ControllerManager(ctx, args)
|
||||
}
|
||||
|
||||
func CurrentETCDOptions() (InitialOptions, error) {
|
||||
|
@ -181,3 +182,7 @@ func Containerd(ctx context.Context, config *daemonconfig.Node) error {
|
|||
func Docker(ctx context.Context, config *daemonconfig.Node) error {
|
||||
return executor.Docker(ctx, config)
|
||||
}
|
||||
|
||||
func APIServerReadyChan() <-chan struct{} {
|
||||
return executor.APIServerReadyChan()
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/k3s-io/k3s/pkg/clientaccess"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/control"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/executor"
|
||||
"github.com/k3s-io/k3s/pkg/datadir"
|
||||
"github.com/k3s-io/k3s/pkg/deploy"
|
||||
"github.com/k3s-io/k3s/pkg/node"
|
||||
|
@ -44,7 +45,10 @@ func ResolveDataDir(dataDir string) (string, error) {
|
|||
return filepath.Join(dataDir, "server"), err
|
||||
}
|
||||
|
||||
func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error {
|
||||
// PrepareServer prepares the server for operation. This includes setting paths
|
||||
// in ControlConfig, creating any certificates not extracted from the bootstrap
|
||||
// data, and binding request handlers.
|
||||
func PrepareServer(ctx context.Context, config *Config, cfg *cmds.Server) error {
|
||||
if err := setupDataDirAndChdir(&config.ControlConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -53,6 +57,19 @@ func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := control.Prepare(ctx, &config.ControlConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
config.ControlConfig.Runtime.Handler = handlers.NewHandler(ctx, &config.ControlConfig, cfg)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartServer starts whatever control-plane and etcd components are enabled by
|
||||
// the current server configuration, runs startup hooks, starts controllers,
|
||||
// and writes the admin kubeconfig.
|
||||
func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error {
|
||||
if err := control.Server(ctx, &config.ControlConfig); err != nil {
|
||||
return pkgerrors.WithMessage(err, "starting kubernetes")
|
||||
}
|
||||
|
@ -60,11 +77,10 @@ func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error {
|
|||
wg := &sync.WaitGroup{}
|
||||
wg.Add(len(config.StartupHooks))
|
||||
|
||||
config.ControlConfig.Runtime.Handler = handlers.NewHandler(ctx, &config.ControlConfig, cfg)
|
||||
config.ControlConfig.Runtime.StartupHooksWg = wg
|
||||
|
||||
shArgs := cmds.StartupHookArgs{
|
||||
APIServerReady: config.ControlConfig.Runtime.APIServerReady,
|
||||
APIServerReady: executor.APIServerReadyChan(),
|
||||
KubeConfigSupervisor: config.ControlConfig.Runtime.KubeConfigSupervisor,
|
||||
Skips: config.ControlConfig.Skips,
|
||||
Disables: config.ControlConfig.Disables,
|
||||
|
@ -87,7 +103,7 @@ func startOnAPIServerReady(ctx context.Context, config *Config) {
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-config.ControlConfig.Runtime.APIServerReady:
|
||||
case <-executor.APIServerReadyChan():
|
||||
if err := runControllers(ctx, config); err != nil {
|
||||
logrus.Fatalf("failed to start controllers: %v", err)
|
||||
}
|
||||
|
|
|
@ -104,6 +104,22 @@ func WaitForAPIServerReady(ctx context.Context, kubeconfigPath string, timeout t
|
|||
return nil
|
||||
}
|
||||
|
||||
// APIServerReadyChan wraps WaitForAPIServerReady, returning a channel that
|
||||
// is closed when the apiserver is ready. If the apiserver does not become
|
||||
// ready within the expected duration, a fatal error is raised.
|
||||
func APIServerReadyChan(ctx context.Context, kubeConfig string, timeout time.Duration) <-chan struct{} {
|
||||
ready := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
defer close(ready)
|
||||
if err := WaitForAPIServerReady(ctx, kubeConfig, timeout); err != nil {
|
||||
logrus.Fatalf("Failed to wait for API server to become ready: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return ready
|
||||
}
|
||||
|
||||
type genericAccessReviewRequest func(context.Context) (*authorizationv1.SubjectAccessReviewStatus, error)
|
||||
|
||||
// WaitForRBACReady polls an AccessReview request until it returns an allowed response. If the user
|
||||
|
|
Loading…
Reference in New Issue