diff --git a/pkg/cluster/bootstrap.go b/pkg/cluster/bootstrap.go new file mode 100644 index 0000000000..848cf44e66 --- /dev/null +++ b/pkg/cluster/bootstrap.go @@ -0,0 +1,110 @@ +package cluster + +import ( + "bytes" + "context" + "fmt" + "os" + "path/filepath" + + "github.com/rancher/k3s/pkg/bootstrap" + "github.com/rancher/k3s/pkg/clientaccess" + "github.com/rancher/k3s/pkg/version" + "github.com/sirupsen/logrus" +) + +func (c *Cluster) Bootstrap(ctx context.Context) error { + if err := c.assignManagedDriver(ctx); err != nil { + return err + } + + runBootstrap, err := c.shouldBootstrapLoad() + if err != nil { + return err + } + c.shouldBootstrap = runBootstrap + + if runBootstrap { + if err := c.bootstrap(ctx); err != nil { + return err + } + } + + return nil +} + +func (c *Cluster) shouldBootstrapLoad() (bool, error) { + if c.managedDB != nil { + c.runtime.HTTPBootstrap = true + if c.config.JoinURL == "" { + return false, nil + } + + token, err := clientaccess.NormalizeAndValidateTokenForUser(c.config.JoinURL, c.config.Token, "server") + if err != nil { + return false, err + } + + info, err := clientaccess.ParseAndValidateToken(c.config.JoinURL, token) + if err != nil { + return false, err + } + c.clientAccessInfo = info + } + + stamp := c.bootstrapStamp() + if _, err := os.Stat(stamp); err == nil { + logrus.Info("Cluster bootstrap already complete") + return false, nil + } + + if c.managedDB != nil && c.config.Token == "" { + return false, fmt.Errorf("K3S_TOKEN is required to join a cluster") + } + + return true, nil +} + +func (c *Cluster) bootstrapped() error { + if err := os.MkdirAll(filepath.Dir(c.bootstrapStamp()), 0700); err != nil { + return err + } + + if _, err := os.Stat(c.bootstrapStamp()); err == nil { + return nil + } + + f, err := os.Create(c.bootstrapStamp()) + if err != nil { + return err + } + + return f.Close() +} + +func (c *Cluster) httpBootstrap() error { + content, err := clientaccess.Get("/v1-"+version.Program+"/server-bootstrap", c.clientAccessInfo) + if err != nil { + return err + } + + return bootstrap.Read(bytes.NewBuffer(content), &c.runtime.ControlRuntimeBootstrap) +} + +func (c *Cluster) bootstrap(ctx context.Context) error { + c.joining = true + + if c.runtime.HTTPBootstrap { + return c.httpBootstrap() + } + + if err := c.storageBootstrap(ctx); err != nil { + return err + } + + return nil +} + +func (c *Cluster) bootstrapStamp() string { + return filepath.Join(c.config.DataDir, "db/joined-"+keyHash(c.config.Token)) +} diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 3e9b49d008..09e4398ef1 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" "github.com/rancher/k3s/pkg/clientaccess" + "github.com/rancher/k3s/pkg/cluster/managed" "github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/kine/pkg/client" "github.com/rancher/kine/pkg/endpoint" @@ -15,8 +16,8 @@ type Cluster struct { clientAccessInfo *clientaccess.Info config *config.Control runtime *config.ControlRuntime - db interface{} - runJoin bool + managedDB managed.Driver + shouldBootstrap bool storageStarted bool etcdConfig endpoint.ETCDConfig joining bool @@ -24,34 +25,33 @@ type Cluster struct { storageClient client.Client } -func (c *Cluster) Start(ctx context.Context) error { - if err := c.startClusterAndHTTPS(ctx); err != nil { - return errors.Wrap(err, "start cluster and https") +func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { + if err := c.initClusterAndHTTPS(ctx); err != nil { + return nil, errors.Wrap(err, "start cluster and https") } - if c.runJoin { - if err := c.postJoin(ctx); err != nil { - return errors.Wrap(err, "post join") - } + if err := c.start(ctx); err != nil { + return nil, errors.Wrap(err, "start cluster and https") } - if err := c.testClusterDB(ctx); err != nil { - return err + ready, err := c.testClusterDB(ctx) + if err != nil { + return nil, err } if c.saveBootstrap { if err := c.save(ctx); err != nil { - return err + return nil, err } } - if c.runJoin { - if err := c.joined(); err != nil { - return err + if c.shouldBootstrap { + if err := c.bootstrapped(); err != nil { + return nil, err } } - return c.startStorage(ctx) + return ready, c.startStorage(ctx) } func (c *Cluster) startStorage(ctx context.Context) error { diff --git a/pkg/cluster/https.go b/pkg/cluster/https.go index d842f3be5e..5dea465c27 100644 --- a/pkg/cluster/https.go +++ b/pkg/cluster/https.go @@ -42,7 +42,7 @@ func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler, }) } -func (c *Cluster) startClusterAndHTTPS(ctx context.Context) error { +func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error { l, handler, err := c.newListener(ctx) if err != nil { return err diff --git a/pkg/cluster/join.go b/pkg/cluster/join.go deleted file mode 100644 index 06da1670af..0000000000 --- a/pkg/cluster/join.go +++ /dev/null @@ -1,107 +0,0 @@ -package cluster - -import ( - "bytes" - "context" - "fmt" - "os" - "path/filepath" - - "github.com/rancher/k3s/pkg/bootstrap" - "github.com/rancher/k3s/pkg/clientaccess" - "github.com/rancher/k3s/pkg/version" - "github.com/sirupsen/logrus" -) - -func (c *Cluster) Join(ctx context.Context) error { - runJoin, err := c.shouldJoin() - if err != nil { - return err - } - c.runJoin = runJoin - - if runJoin { - if err := c.join(ctx); err != nil { - return err - } - } - - return nil -} - -func (c *Cluster) shouldJoin() (bool, error) { - dqlite := c.dqliteEnabled() - if dqlite { - c.runtime.HTTPBootstrap = true - if c.config.JoinURL == "" { - return false, nil - } - } - - stamp := c.joinStamp() - if _, err := os.Stat(stamp); err == nil { - logrus.Info("Cluster bootstrap already complete") - return false, nil - } - - if dqlite && c.config.Token == "" { - return false, fmt.Errorf(version.ProgramUpper + "_TOKEN is required to join a cluster") - } - - return true, nil -} - -func (c *Cluster) joined() error { - if err := os.MkdirAll(filepath.Dir(c.joinStamp()), 0700); err != nil { - return err - } - - if _, err := os.Stat(c.joinStamp()); err == nil { - return nil - } - - f, err := os.Create(c.joinStamp()) - if err != nil { - return err - } - - return f.Close() -} - -func (c *Cluster) httpJoin() error { - token, err := clientaccess.NormalizeAndValidateTokenForUser(c.config.JoinURL, c.config.Token, "server") - if err != nil { - return err - } - - info, err := clientaccess.ParseAndValidateToken(c.config.JoinURL, token) - if err != nil { - return err - } - c.clientAccessInfo = info - - content, err := clientaccess.Get("/v1-"+version.Program+"/server-bootstrap", info) - if err != nil { - return err - } - - return bootstrap.Read(bytes.NewBuffer(content), &c.runtime.ControlRuntimeBootstrap) -} - -func (c *Cluster) join(ctx context.Context) error { - c.joining = true - - if c.runtime.HTTPBootstrap { - return c.httpJoin() - } - - if err := c.storageJoin(ctx); err != nil { - return err - } - - return nil -} - -func (c *Cluster) joinStamp() string { - return filepath.Join(c.config.DataDir, "db/joined-"+keyHash(c.config.Token)) -} diff --git a/pkg/cluster/managed.go b/pkg/cluster/managed.go new file mode 100644 index 0000000000..a89fb85ad0 --- /dev/null +++ b/pkg/cluster/managed.go @@ -0,0 +1,97 @@ +package cluster + +import ( + "context" + "net" + "net/http" + "strings" + "time" + + "github.com/rancher/k3s/pkg/cluster/managed" + "github.com/rancher/kine/pkg/endpoint" + "github.com/sirupsen/logrus" +) + +func (c *Cluster) testClusterDB(ctx context.Context) (<-chan struct{}, error) { + result := make(chan struct{}) + if c.managedDB == nil { + close(result) + return result, nil + } + + go func() { + defer close(result) + for { + if err := c.managedDB.Test(ctx); err != nil { + logrus.Infof("Failed to test data store connection: %v", err) + } else { + logrus.Infof("Data store connection OK") + return + } + + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return + } + } + }() + + return result, nil +} + +func (c *Cluster) start(ctx context.Context) error { + if c.managedDB == nil { + return nil + } + + if c.config.ClusterReset { + return c.managedDB.Reset(ctx) + } + + return c.managedDB.Start(ctx, c.clientAccessInfo) +} + +func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error) { + if c.managedDB == nil { + return l, handler, nil + } + + if !strings.HasPrefix(c.config.Datastore.Endpoint, c.managedDB.EndpointName()+"://") { + c.config.Datastore = endpoint.Config{ + Endpoint: c.managedDB.EndpointName(), + } + } + + return c.managedDB.Register(ctx, c.config, l, handler) +} + +func (c *Cluster) assignManagedDriver(ctx context.Context) error { + for _, driver := range managed.Registered() { + if ok, err := driver.IsInitialized(ctx, c.config); err != nil { + return err + } else if ok { + c.managedDB = driver + return nil + } + } + + endpointType := strings.SplitN(c.config.Datastore.Endpoint, ":", 2)[0] + for _, driver := range managed.Registered() { + if endpointType == driver.EndpointName() { + c.managedDB = driver + return nil + } + } + + if c.config.Datastore.Endpoint == "" && (c.config.ClusterInit || (c.config.Token != "" && c.config.JoinURL != "")) { + for _, driver := range managed.Registered() { + if driver.EndpointName() == managed.Default() { + c.managedDB = driver + return nil + } + } + } + + return nil +} diff --git a/pkg/cluster/managed/drivers.go b/pkg/cluster/managed/drivers.go new file mode 100644 index 0000000000..8ee78b2aff --- /dev/null +++ b/pkg/cluster/managed/drivers.go @@ -0,0 +1,39 @@ +package managed + +import ( + "context" + "net" + "net/http" + + "github.com/rancher/k3s/pkg/clientaccess" + "github.com/rancher/k3s/pkg/daemons/config" +) + +var ( + defaultDriver string + drivers []Driver +) + +type Driver interface { + IsInitialized(ctx context.Context, config *config.Control) (bool, error) + Register(ctx context.Context, config *config.Control, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error) + Reset(ctx context.Context) error + Start(ctx context.Context, clientAccess *clientaccess.Info) error + Test(ctx context.Context) error + EndpointName() string +} + +func RegisterDriver(d Driver) { + drivers = append(drivers, d) +} + +func Registered() []Driver { + return drivers +} + +func Default() string { + if defaultDriver == "" && len(drivers) == 1 { + return drivers[0].EndpointName() + } + return defaultDriver +} diff --git a/pkg/cluster/nocluster.go b/pkg/cluster/nocluster.go deleted file mode 100644 index 19f5728c2a..0000000000 --- a/pkg/cluster/nocluster.go +++ /dev/null @@ -1,25 +0,0 @@ -// +build !dqlite - -package cluster - -import ( - "context" - "net" - "net/http" -) - -func (c *Cluster) testClusterDB(ctx context.Context) error { - return nil -} - -func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error) { - return l, handler, nil -} - -func (c *Cluster) postJoin(ctx context.Context) error { - return nil -} - -func (c *Cluster) dqliteEnabled() bool { - return false -} diff --git a/pkg/cluster/storage.go b/pkg/cluster/storage.go index dc3d776de6..51e8aac967 100644 --- a/pkg/cluster/storage.go +++ b/pkg/cluster/storage.go @@ -22,7 +22,7 @@ func (c *Cluster) save(ctx context.Context) error { return c.storageClient.Create(ctx, storageKey(c.config.Token), data) } -func (c *Cluster) storageJoin(ctx context.Context) error { +func (c *Cluster) storageBootstrap(ctx context.Context) error { if err := c.startStorage(ctx); err != nil { return err } diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index 521b8839f4..080d6f0e52 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -147,8 +147,10 @@ type ControlRuntimeBootstrap struct { type ControlRuntime struct { ControlRuntimeBootstrap - HTTPBootstrap bool - APIServerReady <-chan struct{} + HTTPBootstrap bool + APIServerReady <-chan struct{} + ETCDReady <-chan struct{} + ClusterControllerStart func(ctx context.Context) error ClientKubeAPICert string ClientKubeAPIKey string diff --git a/pkg/daemons/control/server.go b/pkg/daemons/control/server.go index 4b8d64cb5c..069850cb73 100644 --- a/pkg/daemons/control/server.go +++ b/pkg/daemons/control/server.go @@ -309,7 +309,7 @@ func prepare(ctx context.Context, config *config.Control, runtime *config.Contro cluster := cluster.New(config) - if err := cluster.Join(ctx); err != nil { + if err := cluster.Bootstrap(ctx); err != nil { return err } @@ -337,7 +337,13 @@ func prepare(ctx context.Context, config *config.Control, runtime *config.Contro return err } - return cluster.Start(ctx) + ready, err := cluster.Start(ctx) + if err != nil { + return err + } + + runtime.ETCDReady = ready + return nil } func readTokens(runtime *config.ControlRuntime) error { diff --git a/pkg/server/server.go b/pkg/server/server.go index 6316ea4fdf..1da457ce2d 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -115,12 +115,17 @@ func runControllers(ctx context.Context, config *Config) error { return err } + controlConfig.Runtime.Core = sc.Core + if config.ControlConfig.Runtime.ClusterControllerStart != nil { + if err := config.ControlConfig.Runtime.ClusterControllerStart(ctx); err != nil { + return errors.Wrapf(err, "starting cluster controllers") + } + } + if err := sc.Start(ctx); err != nil { return err } - controlConfig.Runtime.Core = sc.Core - start := func(ctx context.Context) { if err := masterControllers(ctx, sc, config); err != nil { panic(err)