mirror of https://github.com/k3s-io/k3s.git
Merge pull request #2300 from brandond/fix_2249
Fix managed etcd cold startup deadlock issue #2249pull/2319/head
commit
714227bdc7
|
@ -54,13 +54,12 @@ func Get(ctx context.Context, agent cmds.Agent, proxy proxy.Proxy) *config.Node
|
|||
type HTTPRequester func(u string, client *http.Client, username, password string) ([]byte, error)
|
||||
|
||||
func Request(path string, info *clientaccess.Info, requester HTTPRequester) ([]byte, error) {
|
||||
u, err := url.Parse(info.URL)
|
||||
u, err := url.Parse(info.BaseURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u.Path = path
|
||||
username, password, _ := clientaccess.ParseUsernamePassword(info.Token)
|
||||
return requester(u.String(), clientaccess.GetHTTPClient(info.CACerts), username, password)
|
||||
return requester(u.String(), clientaccess.GetHTTPClient(info.CACerts), info.Username, info.Password)
|
||||
}
|
||||
|
||||
func getNodeNamedCrt(nodeName, nodeIP, nodePasswordFile string) HTTPRequester {
|
||||
|
|
|
@ -152,7 +152,7 @@ func Run(ctx context.Context, cfg cmds.Agent) error {
|
|||
}
|
||||
|
||||
for {
|
||||
newToken, err := clientaccess.NormalizeAndValidateTokenForUser(proxy.SupervisorURL(), cfg.Token, "node")
|
||||
newToken, err := clientaccess.ParseAndValidateTokenForUser(proxy.SupervisorURL(), cfg.Token, "node")
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
select {
|
||||
|
@ -162,7 +162,7 @@ func Run(ctx context.Context, cfg cmds.Agent) error {
|
|||
}
|
||||
continue
|
||||
}
|
||||
cfg.Token = newToken
|
||||
cfg.Token = newToken.String()
|
||||
break
|
||||
}
|
||||
|
||||
|
|
|
@ -1,278 +0,0 @@
|
|||
package clientaccess
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
||||
)
|
||||
|
||||
var (
|
||||
insecureClient = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
type OverrideURLCallback func(config []byte) (*url.URL, error)
|
||||
|
||||
type clientToken struct {
|
||||
caHash string
|
||||
username string
|
||||
password string
|
||||
}
|
||||
|
||||
func WriteClientKubeConfig(destFile, url, serverCAFile, clientCertFile, clientKeyFile string) error {
|
||||
serverCA, err := ioutil.ReadFile(serverCAFile)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to read %s", serverCAFile)
|
||||
}
|
||||
|
||||
clientCert, err := ioutil.ReadFile(clientCertFile)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to read %s", clientCertFile)
|
||||
}
|
||||
|
||||
clientKey, err := ioutil.ReadFile(clientKeyFile)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to read %s", clientKeyFile)
|
||||
}
|
||||
|
||||
config := clientcmdapi.NewConfig()
|
||||
|
||||
cluster := clientcmdapi.NewCluster()
|
||||
cluster.CertificateAuthorityData = serverCA
|
||||
cluster.Server = url
|
||||
|
||||
authInfo := clientcmdapi.NewAuthInfo()
|
||||
authInfo.ClientCertificateData = clientCert
|
||||
authInfo.ClientKeyData = clientKey
|
||||
|
||||
context := clientcmdapi.NewContext()
|
||||
context.AuthInfo = "default"
|
||||
context.Cluster = "default"
|
||||
|
||||
config.Clusters["default"] = cluster
|
||||
config.AuthInfos["default"] = authInfo
|
||||
config.Contexts["default"] = context
|
||||
config.CurrentContext = "default"
|
||||
|
||||
return clientcmd.WriteToFile(*config, destFile)
|
||||
}
|
||||
|
||||
type Info struct {
|
||||
URL string `json:"url,omitempty"`
|
||||
CACerts []byte `json:"cacerts,omitempty"`
|
||||
username string
|
||||
password string
|
||||
Token string `json:"token,omitempty"`
|
||||
}
|
||||
|
||||
func (i *Info) ToToken() string {
|
||||
return fmt.Sprintf("K10%s::%s:%s", hashCA(i.CACerts), i.username, i.password)
|
||||
}
|
||||
|
||||
func NormalizeAndValidateTokenForUser(server, token, user string) (string, error) {
|
||||
if !strings.HasPrefix(token, "K10") {
|
||||
token = "K10::" + user + ":" + token
|
||||
}
|
||||
info, err := ParseAndValidateToken(server, token)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if info.username != user {
|
||||
info.username = user
|
||||
}
|
||||
|
||||
return info.ToToken(), nil
|
||||
}
|
||||
|
||||
func ParseAndValidateToken(server, token string) (*Info, error) {
|
||||
url, err := url.Parse(server)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "Invalid url, failed to parse %s", server)
|
||||
}
|
||||
|
||||
if url.Scheme != "https" {
|
||||
return nil, fmt.Errorf("only https:// URLs are supported, invalid scheme: %s", server)
|
||||
}
|
||||
|
||||
for strings.HasSuffix(url.Path, "/") {
|
||||
url.Path = url.Path[:len(url.Path)-1]
|
||||
}
|
||||
|
||||
parsedToken, err := parseToken(token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cacerts, err := GetCACerts(*url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(cacerts) > 0 && len(parsedToken.caHash) > 0 {
|
||||
if ok, hash, newHash := validateCACerts(cacerts, parsedToken.caHash); !ok {
|
||||
return nil, fmt.Errorf("token does not match the server %s != %s", hash, newHash)
|
||||
}
|
||||
}
|
||||
|
||||
if err := validateToken(*url, cacerts, parsedToken.username, parsedToken.password); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
i := &Info{
|
||||
URL: url.String(),
|
||||
CACerts: cacerts,
|
||||
username: parsedToken.username,
|
||||
password: parsedToken.password,
|
||||
Token: token,
|
||||
}
|
||||
|
||||
// normalize token
|
||||
i.Token = i.ToToken()
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func validateToken(u url.URL, cacerts []byte, username, password string) error {
|
||||
u.Path = "/cacerts"
|
||||
_, err := get(u.String(), GetHTTPClient(cacerts), username, password)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "token is not valid")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateCACerts(cacerts []byte, hash string) (bool, string, string) {
|
||||
if len(cacerts) == 0 && hash == "" {
|
||||
return true, "", ""
|
||||
}
|
||||
|
||||
newHash := hashCA(cacerts)
|
||||
return hash == newHash, hash, newHash
|
||||
}
|
||||
|
||||
func hashCA(cacerts []byte) string {
|
||||
digest := sha256.Sum256(cacerts)
|
||||
return hex.EncodeToString(digest[:])
|
||||
}
|
||||
|
||||
func ParseUsernamePassword(token string) (string, string, bool) {
|
||||
parsed, err := parseToken(token)
|
||||
if err != nil {
|
||||
return "", "", false
|
||||
}
|
||||
return parsed.username, parsed.password, true
|
||||
}
|
||||
|
||||
func parseToken(token string) (clientToken, error) {
|
||||
var result clientToken
|
||||
|
||||
if !strings.HasPrefix(token, "K10") {
|
||||
return result, fmt.Errorf("token is not a valid token format")
|
||||
}
|
||||
|
||||
token = token[3:]
|
||||
|
||||
parts := strings.SplitN(token, "::", 2)
|
||||
token = parts[0]
|
||||
if len(parts) > 1 {
|
||||
result.caHash = parts[0]
|
||||
token = parts[1]
|
||||
}
|
||||
|
||||
parts = strings.SplitN(token, ":", 2)
|
||||
if len(parts) != 2 {
|
||||
return result, fmt.Errorf("token credentials are the wrong format")
|
||||
}
|
||||
|
||||
result.username = parts[0]
|
||||
result.password = parts[1]
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func GetHTTPClient(cacerts []byte) *http.Client {
|
||||
if len(cacerts) == 0 {
|
||||
return http.DefaultClient
|
||||
}
|
||||
|
||||
pool := x509.NewCertPool()
|
||||
pool.AppendCertsFromPEM(cacerts)
|
||||
|
||||
return &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DisableKeepAlives: true,
|
||||
TLSClientConfig: &tls.Config{
|
||||
RootCAs: pool,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func Get(path string, info *Info) ([]byte, error) {
|
||||
u, err := url.Parse(info.URL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u.Path = path
|
||||
return get(u.String(), GetHTTPClient(info.CACerts), info.username, info.password)
|
||||
}
|
||||
|
||||
func GetCACerts(u url.URL) ([]byte, error) {
|
||||
u.Path = "/cacerts"
|
||||
url := u.String()
|
||||
|
||||
_, err := get(url, http.DefaultClient, "", "")
|
||||
if err == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
cacerts, err := get(url, insecureClient, "", "")
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to get CA certs at %s", url)
|
||||
}
|
||||
|
||||
_, err = get(url, GetHTTPClient(cacerts), "", "")
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "server %s is not trusted", url)
|
||||
}
|
||||
|
||||
return cacerts, nil
|
||||
}
|
||||
|
||||
func get(u string, client *http.Client, username, password string) ([]byte, error) {
|
||||
req, err := http.NewRequest(http.MethodGet, u, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if username != "" {
|
||||
req.SetBasicAuth(username, password)
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("%s: %s", u, resp.Status)
|
||||
}
|
||||
|
||||
return ioutil.ReadAll(resp.Body)
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
package clientaccess
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
||||
)
|
||||
|
||||
// WriteClientKubeConfig generates a kubeconfig at destFile that can be used to connect to a server at url with the given certs and keys
|
||||
func WriteClientKubeConfig(destFile string, url string, serverCAFile string, clientCertFile string, clientKeyFile string) error {
|
||||
serverCA, err := ioutil.ReadFile(serverCAFile)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to read %s", serverCAFile)
|
||||
}
|
||||
|
||||
clientCert, err := ioutil.ReadFile(clientCertFile)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to read %s", clientCertFile)
|
||||
}
|
||||
|
||||
clientKey, err := ioutil.ReadFile(clientKeyFile)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to read %s", clientKeyFile)
|
||||
}
|
||||
|
||||
config := clientcmdapi.NewConfig()
|
||||
|
||||
cluster := clientcmdapi.NewCluster()
|
||||
cluster.CertificateAuthorityData = serverCA
|
||||
cluster.Server = url
|
||||
|
||||
authInfo := clientcmdapi.NewAuthInfo()
|
||||
authInfo.ClientCertificateData = clientCert
|
||||
authInfo.ClientKeyData = clientKey
|
||||
|
||||
context := clientcmdapi.NewContext()
|
||||
context.AuthInfo = "default"
|
||||
context.Cluster = "default"
|
||||
|
||||
config.Clusters["default"] = cluster
|
||||
config.AuthInfos["default"] = authInfo
|
||||
config.Contexts["default"] = context
|
||||
config.CurrentContext = "default"
|
||||
|
||||
return clientcmd.WriteToFile(*config, destFile)
|
||||
}
|
|
@ -0,0 +1,277 @@
|
|||
package clientaccess
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultClientTimeout = 20 * time.Second
|
||||
|
||||
defaultClient = &http.Client{
|
||||
Timeout: defaultClientTimeout,
|
||||
}
|
||||
insecureClient = &http.Client{
|
||||
Timeout: defaultClientTimeout,
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
tokenPrefix = "K10"
|
||||
tokenFormat = "%s%s::%s:%s"
|
||||
)
|
||||
|
||||
type OverrideURLCallback func(config []byte) (*url.URL, error)
|
||||
|
||||
type Info struct {
|
||||
CACerts []byte `json:"cacerts,omitempty"`
|
||||
BaseURL string `json:"baseurl,omitempty"`
|
||||
Username string `json:"username,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
caHash string
|
||||
}
|
||||
|
||||
// String returns the token data, templated according to the token format
|
||||
func (info *Info) String() string {
|
||||
return fmt.Sprintf(tokenFormat, tokenPrefix, hashCA(info.CACerts), info.Username, info.Password)
|
||||
}
|
||||
|
||||
// ParseAndValidateToken parses a token, downloads and validates the server's CA bundle,
|
||||
// and validates it according to the caHash from the token if set.
|
||||
func ParseAndValidateToken(server string, token string) (*Info, error) {
|
||||
info, err := parseToken(token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := info.setServer(server); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if info.caHash != "" {
|
||||
if err := info.validateCAHash(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// ParseAndValidateToken parses a token with user override, downloads and
|
||||
// validates the server's CA bundle, and validates it according to the caHash from the token if set.
|
||||
func ParseAndValidateTokenForUser(server string, token string, username string) (*Info, error) {
|
||||
info, err := parseToken(token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info.Username = username
|
||||
|
||||
if err := info.setServer(server); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if info.caHash != "" {
|
||||
if err := info.validateCAHash(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// validateCACerts returns a boolean indicating whether or not a CA bundle matches the provided hash,
|
||||
// and a string containing the hash of the CA bundle.
|
||||
func validateCACerts(cacerts []byte, hash string) (bool, string) {
|
||||
if len(cacerts) == 0 && hash == "" {
|
||||
return true, ""
|
||||
}
|
||||
|
||||
newHash := hashCA(cacerts)
|
||||
return hash == newHash, newHash
|
||||
}
|
||||
|
||||
// hashCA returns the hex-encoded SHA256 digest of a byte array.
|
||||
func hashCA(cacerts []byte) string {
|
||||
digest := sha256.Sum256(cacerts)
|
||||
return hex.EncodeToString(digest[:])
|
||||
}
|
||||
|
||||
// ParseUsernamePassword returns the username and password portion of a token string,
|
||||
// along with a bool indicating if the token was successfully parsed.
|
||||
func ParseUsernamePassword(token string) (string, string, bool) {
|
||||
info, err := parseToken(token)
|
||||
if err != nil {
|
||||
return "", "", false
|
||||
}
|
||||
return info.Username, info.Password, true
|
||||
}
|
||||
|
||||
// parseToken parses a token into an Info struct
|
||||
func parseToken(token string) (*Info, error) {
|
||||
var info = &Info{}
|
||||
|
||||
if !strings.HasPrefix(token, tokenPrefix) {
|
||||
token = fmt.Sprintf(tokenFormat, tokenPrefix, "", "", token)
|
||||
}
|
||||
|
||||
// Strip off the prefix
|
||||
token = token[len(tokenPrefix):]
|
||||
|
||||
parts := strings.SplitN(token, "::", 2)
|
||||
token = parts[0]
|
||||
if len(parts) > 1 {
|
||||
info.caHash = parts[0]
|
||||
token = parts[1]
|
||||
}
|
||||
|
||||
parts = strings.SplitN(token, ":", 2)
|
||||
if len(parts) != 2 {
|
||||
return nil, fmt.Errorf("invalid token format")
|
||||
}
|
||||
|
||||
info.Username = parts[0]
|
||||
info.Password = parts[1]
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// GetHTTPClient returns a http client that validates TLS server certificates using the provided CA bundle.
|
||||
// If the CA bundle is empty, it validates using the default http client using the OS CA bundle.
|
||||
// If the CA bundle is not empty but does not contain any valid certs, it validates using
|
||||
// an empty CA bundle (which will always fail).
|
||||
func GetHTTPClient(cacerts []byte) *http.Client {
|
||||
if len(cacerts) == 0 {
|
||||
return defaultClient
|
||||
}
|
||||
|
||||
pool := x509.NewCertPool()
|
||||
pool.AppendCertsFromPEM(cacerts)
|
||||
|
||||
return &http.Client{
|
||||
Timeout: defaultClientTimeout,
|
||||
Transport: &http.Transport{
|
||||
DisableKeepAlives: true,
|
||||
TLSClientConfig: &tls.Config{
|
||||
RootCAs: pool,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Get makes a request to a subpath of info's BaseURL
|
||||
func Get(path string, info *Info) ([]byte, error) {
|
||||
u, err := url.Parse(info.BaseURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u.Path = path
|
||||
return get(u.String(), GetHTTPClient(info.CACerts), info.Username, info.Password)
|
||||
}
|
||||
|
||||
// setServer sets the BaseURL and CACerts fields of the Info by connecting to the server
|
||||
// and storing the CA bundle.
|
||||
func (info *Info) setServer(server string) error {
|
||||
url, err := url.Parse(server)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Invalid server url, failed to parse: %s", server)
|
||||
}
|
||||
|
||||
if url.Scheme != "https" {
|
||||
return fmt.Errorf("only https:// URLs are supported, invalid scheme: %s", server)
|
||||
}
|
||||
|
||||
for strings.HasSuffix(url.Path, "/") {
|
||||
url.Path = url.Path[:len(url.Path)-1]
|
||||
}
|
||||
|
||||
cacerts, err := getCACerts(*url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
info.BaseURL = url.String()
|
||||
info.CACerts = cacerts
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateCAHash validates that info's caHash matches the CACerts hash.
|
||||
func (info *Info) validateCAHash() error {
|
||||
if ok, serverHash := validateCACerts(info.CACerts, info.caHash); !ok {
|
||||
return fmt.Errorf("token CA hash does not match the server CA hash: %s != %s", info.caHash, serverHash)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getCACerts retrieves the CA bundle from a server.
|
||||
// An error is raised if the CA bundle cannot be retrieved,
|
||||
// or if the server's cert is not signed by the returned bundle.
|
||||
func getCACerts(u url.URL) ([]byte, error) {
|
||||
u.Path = "/cacerts"
|
||||
url := u.String()
|
||||
|
||||
// This first request is expected to fail. If the server has
|
||||
// a cert that can be validated using the default CA bundle, return
|
||||
// success with no CA certs.
|
||||
_, err := get(url, defaultClient, "", "")
|
||||
if err == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Download the CA bundle using a client that does not validate certs.
|
||||
cacerts, err := get(url, insecureClient, "", "")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get CA certs")
|
||||
}
|
||||
|
||||
// Request the CA bundle again, validating that the CA bundle can be loaded
|
||||
// and used to validate the server certificate. This should only fail if we somehow
|
||||
// get an empty CA bundle. or if the dynamiclistener cert is incorrectly signed.
|
||||
_, err = get(url, GetHTTPClient(cacerts), "", "")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "CA cert validation failed")
|
||||
}
|
||||
|
||||
return cacerts, nil
|
||||
}
|
||||
|
||||
// get makes a request to a url using a provided client, username, and password,
|
||||
// returning the response body.
|
||||
func get(u string, client *http.Client, username, password string) ([]byte, error) {
|
||||
req, err := http.NewRequest(http.MethodGet, u, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if username != "" {
|
||||
req.SetBasicAuth(username, password)
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("%s: %s", u, resp.Status)
|
||||
}
|
||||
|
||||
return ioutil.ReadAll(resp.Body)
|
||||
}
|
|
@ -3,7 +3,7 @@ package cluster
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
|
@ -13,18 +13,22 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Bootstrap attempts to load a managed database driver, if one has been initialized or should be created/joined.
|
||||
// It then checks to see if the cluster needs to load boostrap data, and if so, loads data into the
|
||||
// ControlRuntimeBoostrap struct, either via HTTP or from the datastore.
|
||||
func (c *Cluster) Bootstrap(ctx context.Context) error {
|
||||
if err := c.assignManagedDriver(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
runBootstrap, err := c.shouldBootstrapLoad()
|
||||
shouldBootstrap, err := c.shouldBootstrapLoad(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.shouldBootstrap = runBootstrap
|
||||
|
||||
if runBootstrap {
|
||||
c.shouldBootstrap = shouldBootstrap
|
||||
|
||||
if shouldBootstrap {
|
||||
if err := c.bootstrap(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -33,48 +37,72 @@ func (c *Cluster) Bootstrap(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) shouldBootstrapLoad() (bool, error) {
|
||||
// shouldBootstrapLoad returns true if we need to load ControlRuntimeBootstrap data again.
|
||||
// This is controlled by a stamp file on disk that records successful bootstrap using a hash of the join token.
|
||||
func (c *Cluster) shouldBootstrapLoad(ctx context.Context) (bool, error) {
|
||||
// Non-nil managedDB indicates that the database is either initialized, initializing, or joining
|
||||
if c.managedDB != nil {
|
||||
c.runtime.HTTPBootstrap = true
|
||||
if c.config.JoinURL == "" {
|
||||
|
||||
isInitialized, err := c.managedDB.IsInitialized(ctx, c.config)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if isInitialized {
|
||||
// If the database is initialized we skip bootstrapping; if the user wants to rejoin a
|
||||
// cluster they need to delete the database.
|
||||
logrus.Infof("Managed %s cluster bootstrap already complete and initialized", c.managedDB.EndpointName())
|
||||
return false, nil
|
||||
}
|
||||
} else if c.config.JoinURL == "" {
|
||||
// Not initialized, not joining - must be initializing (cluster-init)
|
||||
logrus.Infof("Managed %s cluster initializing", c.managedDB.EndpointName())
|
||||
return false, nil
|
||||
} else {
|
||||
// Not initialized, but have a Join URL - fail if there's no token; if there is then validate it.
|
||||
if c.config.Token == "" {
|
||||
return false, errors.New(version.ProgramUpper + "_TOKEN is required to join a cluster")
|
||||
}
|
||||
|
||||
token, err := clientaccess.NormalizeAndValidateTokenForUser(c.config.JoinURL, c.config.Token, "server")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
// Fail if the token isn't syntactically valid, or if the CA hash on the remote server doesn't match
|
||||
// the hash in the token. The password isn't actually checked until later when actually bootstrapping.
|
||||
info, err := clientaccess.ParseAndValidateTokenForUser(c.config.JoinURL, c.config.Token, "server")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
info, err := clientaccess.ParseAndValidateToken(c.config.JoinURL, token)
|
||||
if err != nil {
|
||||
return false, err
|
||||
logrus.Infof("Managed %s cluster not yet initialized", c.managedDB.EndpointName())
|
||||
c.clientAccessInfo = info
|
||||
}
|
||||
c.clientAccessInfo = info
|
||||
}
|
||||
|
||||
// Check the stamp file to see if we have successfully bootstrapped using this token.
|
||||
// NOTE: The fact that we use a hash of the token to generate the stamp
|
||||
// means that it is unsafe to use the same token for multiple clusters.
|
||||
stamp := c.bootstrapStamp()
|
||||
if _, err := os.Stat(stamp); err == nil {
|
||||
logrus.Info("Cluster bootstrap already complete")
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if c.managedDB != nil && c.config.Token == "" {
|
||||
return false, fmt.Errorf("K3S_TOKEN is required to join a cluster")
|
||||
}
|
||||
|
||||
// No errors and no bootstrap stamp, need to bootstrap.
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// bootstrapped touches a file to indicate that bootstrap has been completed.
|
||||
func (c *Cluster) bootstrapped() error {
|
||||
if err := os.MkdirAll(filepath.Dir(c.bootstrapStamp()), 0700); err != nil {
|
||||
stamp := c.bootstrapStamp()
|
||||
if err := os.MkdirAll(filepath.Dir(stamp), 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := os.Stat(c.bootstrapStamp()); err == nil {
|
||||
// return if file already exists
|
||||
if _, err := os.Stat(stamp); err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
f, err := os.Create(c.bootstrapStamp())
|
||||
// otherwise try to create it
|
||||
f, err := os.Create(stamp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -82,6 +110,9 @@ func (c *Cluster) bootstrapped() error {
|
|||
return f.Close()
|
||||
}
|
||||
|
||||
// httpBootstrap retrieves bootstrap data (certs and keys, etc) from the remote server via HTTP
|
||||
// and loads it into the ControlRuntimeBootstrap struct. Unlike the storage bootstrap path,
|
||||
// this data does not need to be decrypted since it is generated on-demand by an existing server.
|
||||
func (c *Cluster) httpBootstrap() error {
|
||||
content, err := clientaccess.Get("/v1-"+version.Program+"/server-bootstrap", c.clientAccessInfo)
|
||||
if err != nil {
|
||||
|
@ -91,20 +122,22 @@ func (c *Cluster) httpBootstrap() error {
|
|||
return bootstrap.Read(bytes.NewBuffer(content), &c.runtime.ControlRuntimeBootstrap)
|
||||
}
|
||||
|
||||
// bootstrap performs cluster bootstrapping, either via HTTP (for managed databases) or direct load from datastore.
|
||||
func (c *Cluster) bootstrap(ctx context.Context) error {
|
||||
c.joining = true
|
||||
|
||||
// bootstrap managed database via HTTP
|
||||
if c.runtime.HTTPBootstrap {
|
||||
return c.httpBootstrap()
|
||||
}
|
||||
|
||||
if err := c.storageBootstrap(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
// Bootstrap directly from datastore
|
||||
return c.storageBootstrap(ctx)
|
||||
}
|
||||
|
||||
// bootstrapStamp returns the path to a file in datadir/db that is used to record
|
||||
// that a cluster has been joined. The filename is based on a portion of the sha256 hash of the token.
|
||||
// We hash the token value exactly as it is provided by the user, NOT the normalized version.
|
||||
func (c *Cluster) bootstrapStamp() string {
|
||||
return filepath.Join(c.config.DataDir, "db/joined-"+keyHash(c.config.Token))
|
||||
}
|
||||
|
|
|
@ -25,26 +25,34 @@ type Cluster struct {
|
|||
storageClient client.Client
|
||||
}
|
||||
|
||||
// Start creates the dynamic tls listener, http request handler,
|
||||
// handles starting and writing/reading bootstrap data, and returns a channel
|
||||
// that will be closed when datastore is ready.
|
||||
func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
|
||||
// Set up the dynamiclistener and http request handlers
|
||||
if err := c.initClusterAndHTTPS(ctx); err != nil {
|
||||
return nil, errors.Wrap(err, "start cluster and https")
|
||||
return nil, errors.Wrap(err, "init cluster datastore and https")
|
||||
}
|
||||
|
||||
// start managed database (if necessary)
|
||||
if err := c.start(ctx); err != nil {
|
||||
return nil, errors.Wrap(err, "start cluster and https")
|
||||
return nil, errors.Wrap(err, "start managed database")
|
||||
}
|
||||
|
||||
// get the wait channel for testing managed database readiness
|
||||
ready, err := c.testClusterDB(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if necessary, store bootstrap data to datastore
|
||||
if c.saveBootstrap {
|
||||
if err := c.save(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// if necessary, record successful bootstrap
|
||||
if c.shouldBootstrap {
|
||||
if err := c.bootstrapped(); err != nil {
|
||||
return nil, err
|
||||
|
@ -54,17 +62,25 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
|
|||
return ready, c.startStorage(ctx)
|
||||
}
|
||||
|
||||
// startStorage starts the kine listener and configures the endpoints, if necessary.
|
||||
// This calls into the kine endpoint code, which sets up the database client
|
||||
// and unix domain socket listener if using an external database. In the case of an etcd
|
||||
// backend it just returns the user-provided etcd endpoints and tls config.
|
||||
func (c *Cluster) startStorage(ctx context.Context) error {
|
||||
if c.storageStarted {
|
||||
return nil
|
||||
}
|
||||
c.storageStarted = true
|
||||
|
||||
// start listening on the kine socket as an etcd endpoint, or return the external etcd endpoints
|
||||
etcdConfig, err := endpoint.Listen(ctx, c.config.Datastore)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "creating storage endpoint")
|
||||
}
|
||||
|
||||
// Persist the returned etcd configuration. We decide if we're doing leader election for embedded controllers
|
||||
// based on what the kine wrapper tells us about the datastore. Single-node datastores like sqlite don't require
|
||||
// leader election, while basically all others (etcd, external database, etc) do since they allow multiple servers.
|
||||
c.etcdConfig = etcdConfig
|
||||
c.config.Datastore.Config = etcdConfig.TLSConfig
|
||||
c.config.Datastore.Endpoint = strings.Join(etcdConfig.Endpoints, ",")
|
||||
|
@ -72,6 +88,7 @@ func (c *Cluster) startStorage(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// New creates an initial cluster using the provided configuration
|
||||
func New(config *config.Control) *Cluster {
|
||||
return &Cluster{
|
||||
config: config,
|
||||
|
|
|
@ -16,18 +16,21 @@ import (
|
|||
"golang.org/x/crypto/pbkdf2"
|
||||
)
|
||||
|
||||
// storageKey returns the etcd key for storing bootstrap data for a given passphrase.
|
||||
// The key is derived from the sha256 hash of the passphrase.
|
||||
func storageKey(passphrase string) string {
|
||||
d := sha256.New()
|
||||
d.Write([]byte(passphrase))
|
||||
return "/bootstrap/" + hex.EncodeToString(d.Sum(nil)[:])[:12]
|
||||
return "/bootstrap/" + keyHash(passphrase)
|
||||
}
|
||||
|
||||
// keyHash returns the first 12 characters of the sha256 sum of the passphrase.
|
||||
func keyHash(passphrase string) string {
|
||||
d := sha256.New()
|
||||
d.Write([]byte(passphrase))
|
||||
return hex.EncodeToString(d.Sum(nil)[:])[:12]
|
||||
}
|
||||
|
||||
// encrypt encrypts a byte slice using aes+gcm with a pbkdf2 key derived from the passphrase and a random salt.
|
||||
// It returns a byte slice containing the salt and base64-encoded cyphertext.
|
||||
func encrypt(passphrase string, plaintext []byte) ([]byte, error) {
|
||||
salt, err := token.Random(8)
|
||||
if err != nil {
|
||||
|
@ -55,6 +58,8 @@ func encrypt(passphrase string, plaintext []byte) ([]byte, error) {
|
|||
return []byte(salt + ":" + base64.StdEncoding.EncodeToString(sealed)), nil
|
||||
}
|
||||
|
||||
// decrypt attempts to decrypt the byte slice using the supplied passphrase.
|
||||
// The input byte slice should be the cyphertext output from the encrypt function.
|
||||
func decrypt(passphrase string, ciphertext []byte) ([]byte, error) {
|
||||
parts := strings.SplitN(string(ciphertext), ":", 2)
|
||||
if len(parts) != 2 {
|
||||
|
|
|
@ -17,8 +17,12 @@ import (
|
|||
"github.com/rancher/k3s/pkg/version"
|
||||
"github.com/rancher/wrangler-api/pkg/generated/controllers/core"
|
||||
"github.com/sirupsen/logrus"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
// newListener returns a new TCP listener and HTTP reqest handler using dynamiclistener.
|
||||
// dynamiclistener will use the cluster's Server CA to sign the dynamically generate certificate,
|
||||
// and will sync the certs into the Kubernetes datastore, with a local disk cache.
|
||||
func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler, error) {
|
||||
tcp, err := dynamiclistener.NewTCPListener(c.config.BindAddress, c.config.SupervisorPort)
|
||||
if err != nil {
|
||||
|
@ -32,44 +36,52 @@ func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler,
|
|||
|
||||
storage := tlsStorage(ctx, c.config.DataDir, c.runtime)
|
||||
return dynamiclistener.NewListener(tcp, storage, cert, key, dynamiclistener.Config{
|
||||
CN: version.Program,
|
||||
Organization: []string{version.Program},
|
||||
ExpirationDaysCheck: config.CertificateRenewDays,
|
||||
Organization: []string{version.Program},
|
||||
SANs: append(c.config.SANs, "localhost", "kubernetes", "kubernetes.default", "kubernetes.default.svc."+c.config.ClusterDomain),
|
||||
CN: version.Program,
|
||||
TLSConfig: &tls.Config{
|
||||
ClientAuth: tls.RequestClientCert,
|
||||
MinVersion: c.config.TLSMinVersion,
|
||||
CipherSuites: c.config.TLSCipherSuites,
|
||||
},
|
||||
SANs: append(c.config.SANs, "localhost", "kubernetes", "kubernetes.default", "kubernetes.default.svc."+c.config.ClusterDomain),
|
||||
ExpirationDaysCheck: config.CertificateRenewDays,
|
||||
})
|
||||
}
|
||||
|
||||
// initClusterAndHTTPS sets up the dynamic tls listener, request router,
|
||||
// and cluster database. Once the database is up, it starts the supervisor http server.
|
||||
func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error {
|
||||
l, handler, err := c.newListener(ctx)
|
||||
// Set up dynamiclistener TLS listener and request handler
|
||||
listener, handler, err := c.newListener(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get the base request handler
|
||||
handler, err = c.getHandler(handler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l, handler, err = c.initClusterDB(ctx, l, handler)
|
||||
// Config the cluster database and allow it to add additional request handlers
|
||||
handler, err = c.initClusterDB(ctx, handler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a HTTP server with the registered request handlers, using logrus for logging
|
||||
server := http.Server{
|
||||
Handler: handler,
|
||||
ErrorLog: log.New(logrus.StandardLogger().Writer(), "Cluster-Http-Server ", log.LstdFlags),
|
||||
}
|
||||
|
||||
// Start the supervisor http server on the tls listener
|
||||
go func() {
|
||||
err := server.Serve(l)
|
||||
err := server.Serve(listener)
|
||||
logrus.Fatalf("server stopped: %v", err)
|
||||
}()
|
||||
|
||||
// Shutdown the http server when the context is closed
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
server.Shutdown(context.Background())
|
||||
|
@ -78,10 +90,12 @@ func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// tlsStorage creates an in-memory cache for dynamiclistener's certificate, backed by a file on disk
|
||||
// and the Kubernetes datastore.
|
||||
func tlsStorage(ctx context.Context, dataDir string, runtime *config.ControlRuntime) dynamiclistener.TLSStorage {
|
||||
fileStorage := file.New(filepath.Join(dataDir, "tls/dynamic-cert.json"))
|
||||
cache := memory.NewBacked(fileStorage)
|
||||
return kubernetes.New(ctx, func() *core.Factory {
|
||||
return runtime.Core
|
||||
}, "kube-system", ""+version.Program+"-serving", cache)
|
||||
}, metav1.NamespaceSystem, version.Program+"-serving", cache)
|
||||
}
|
||||
|
|
|
@ -1,17 +1,19 @@
|
|||
package cluster
|
||||
|
||||
// A managed database is one whose lifecycle we control - initializing the cluster, adding/removing members, taking snapshots, etc.
|
||||
// This is currently just used for the embedded etcd datastore. Kine and other external etcd clusters are NOT considered managed.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/rancher/k3s/pkg/cluster/managed"
|
||||
"github.com/rancher/kine/pkg/endpoint"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// testClusterDB returns a channel that will be closed when the datastore connection is available.
|
||||
// The datastore is tested for readiness every 5 seconds until the test succeeds.
|
||||
func (c *Cluster) testClusterDB(ctx context.Context) (<-chan struct{}, error) {
|
||||
result := make(chan struct{})
|
||||
if c.managedDB == nil {
|
||||
|
@ -40,6 +42,8 @@ func (c *Cluster) testClusterDB(ctx context.Context) (<-chan struct{}, error) {
|
|||
return result, nil
|
||||
}
|
||||
|
||||
// start starts the database, unless a cluster reset has been requested, in which case
|
||||
// it does that instead.
|
||||
func (c *Cluster) start(ctx context.Context) error {
|
||||
if c.managedDB == nil {
|
||||
return nil
|
||||
|
@ -52,21 +56,19 @@ func (c *Cluster) start(ctx context.Context) error {
|
|||
return c.managedDB.Start(ctx, c.clientAccessInfo)
|
||||
}
|
||||
|
||||
func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error) {
|
||||
// initClusterDB registers routes for database info with the http request handler
|
||||
func (c *Cluster) initClusterDB(ctx context.Context, handler http.Handler) (http.Handler, error) {
|
||||
if c.managedDB == nil {
|
||||
return l, handler, nil
|
||||
return handler, nil
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(c.config.Datastore.Endpoint, c.managedDB.EndpointName()+"://") {
|
||||
c.config.Datastore = endpoint.Config{
|
||||
Endpoint: c.managedDB.EndpointName(),
|
||||
}
|
||||
}
|
||||
|
||||
return c.managedDB.Register(ctx, c.config, l, handler)
|
||||
return c.managedDB.Register(ctx, c.config, handler)
|
||||
}
|
||||
|
||||
// assignManagedDriver checks to see if any managed databases are already configured or should be created/joined.
|
||||
// If a driver has been initialized it is used, otherwise we create or join a cluster using the default driver.
|
||||
func (c *Cluster) assignManagedDriver(ctx context.Context) error {
|
||||
// Check all managed drivers for an initialized database on disk; use one if found
|
||||
for _, driver := range managed.Registered() {
|
||||
if ok, err := driver.IsInitialized(ctx, c.config); err != nil {
|
||||
return err
|
||||
|
@ -76,14 +78,7 @@ func (c *Cluster) assignManagedDriver(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
endpointType := strings.SplitN(c.config.Datastore.Endpoint, ":", 2)[0]
|
||||
for _, driver := range managed.Registered() {
|
||||
if endpointType == driver.EndpointName() {
|
||||
c.managedDB = driver
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// If we have been asked to initialize or join a cluster, do so using the default managed database.
|
||||
if c.config.Datastore.Endpoint == "" && (c.config.ClusterInit || (c.config.Token != "" && c.config.JoinURL != "")) {
|
||||
for _, driver := range managed.Registered() {
|
||||
if driver.EndpointName() == managed.Default() {
|
||||
|
|
|
@ -2,7 +2,6 @@ package managed
|
|||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"net/http"
|
||||
|
||||
"github.com/rancher/k3s/pkg/clientaccess"
|
||||
|
@ -16,7 +15,7 @@ var (
|
|||
|
||||
type Driver interface {
|
||||
IsInitialized(ctx context.Context, config *config.Control) (bool, error)
|
||||
Register(ctx context.Context, config *config.Control, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error)
|
||||
Register(ctx context.Context, config *config.Control, handler http.Handler) (http.Handler, error)
|
||||
Reset(ctx context.Context, clientAccessInfo *clientaccess.Info) error
|
||||
Start(ctx context.Context, clientAccessInfo *clientaccess.Info) error
|
||||
Test(ctx context.Context, clientAccessInfo *clientaccess.Info) error
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"net/http"
|
||||
)
|
||||
|
||||
// getHandler returns a basic request handler that processes requests through
|
||||
// the cluster's request router chain.
|
||||
func (c *Cluster) getHandler(handler http.Handler) (http.Handler, error) {
|
||||
next := c.router()
|
||||
|
||||
|
@ -13,6 +15,8 @@ func (c *Cluster) getHandler(handler http.Handler) (http.Handler, error) {
|
|||
}), nil
|
||||
}
|
||||
|
||||
// router is a stub request router that returns a Service Unavailable response
|
||||
// if no additional handlers are available.
|
||||
func (c *Cluster) router() http.Handler {
|
||||
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
if c.runtime.Handler == nil {
|
||||
|
|
|
@ -8,6 +8,10 @@ import (
|
|||
"github.com/rancher/kine/pkg/client"
|
||||
)
|
||||
|
||||
// save writes the current ControlRuntimeBootstrap data to the datastore. This contains a complete
|
||||
// snapshot of the cluster's CA certs and keys, encryption passphrases, etc - encrypted with the join token.
|
||||
// This is used when bootstrapping a cluster from a managed database or external etcd cluster.
|
||||
// This is NOT used with embedded etcd, which bootstraps over HTTP.
|
||||
func (c *Cluster) save(ctx context.Context) error {
|
||||
buf := &bytes.Buffer{}
|
||||
if err := bootstrap.Write(buf, &c.runtime.ControlRuntimeBootstrap); err != nil {
|
||||
|
@ -22,6 +26,8 @@ func (c *Cluster) save(ctx context.Context) error {
|
|||
return c.storageClient.Create(ctx, storageKey(c.config.Token), data)
|
||||
}
|
||||
|
||||
// storageBootstrap loads data from the datastore into the ControlRuntimeBootstrap struct.
|
||||
// The storage key and encryption passphrase are both derived from the join token.
|
||||
func (c *Cluster) storageBootstrap(ctx context.Context) error {
|
||||
if err := c.startStorage(ctx); err != nil {
|
||||
return err
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
|
@ -106,19 +105,19 @@ func (e *ETCD) Test(ctx context.Context, clientAccessInfo *clientaccess.Info) er
|
|||
return fmt.Errorf(msg)
|
||||
}
|
||||
|
||||
// walDir returns the path to dataDir/member/wal
|
||||
func walDir(config *config.Control) string {
|
||||
return filepath.Join(dataDir(config), "member", "wal")
|
||||
}
|
||||
|
||||
// dataDir returns the path to dataDir/db/etcd
|
||||
func dataDir(config *config.Control) string {
|
||||
// etcdDBDir returns the path to dataDir/db/etcd
|
||||
func etcdDBDir(config *config.Control) string {
|
||||
return filepath.Join(config.DataDir, "db", "etcd")
|
||||
}
|
||||
|
||||
// nameFile returns the path to dataDir/name
|
||||
// walDir returns the path to etcdDBDir/member/wal
|
||||
func walDir(config *config.Control) string {
|
||||
return filepath.Join(etcdDBDir(config), "member", "wal")
|
||||
}
|
||||
|
||||
// nameFile returns the path to etcdDBDir/name
|
||||
func nameFile(config *config.Control) string {
|
||||
return filepath.Join(dataDir(config), "name")
|
||||
return filepath.Join(etcdDBDir(config), "name")
|
||||
}
|
||||
|
||||
// IsInitialized checks to see if a WAL directory exists. If so, we assume that etcd
|
||||
|
@ -276,20 +275,20 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er
|
|||
})
|
||||
}
|
||||
|
||||
// Register configures a new etcd client and adds db info routes for the http listener.
|
||||
func (e *ETCD) Register(ctx context.Context, config *config.Control, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error) {
|
||||
// Register configures a new etcd client and adds db info routes for the http request handler.
|
||||
func (e *ETCD) Register(ctx context.Context, config *config.Control, handler http.Handler) (http.Handler, error) {
|
||||
e.config = config
|
||||
e.runtime = config.Runtime
|
||||
|
||||
client, err := getClient(ctx, e.runtime, endpoint)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
e.client = client
|
||||
|
||||
address, err := getAdvertiseAddress(config.AdvertiseIP)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
e.address = address
|
||||
|
||||
|
@ -299,10 +298,10 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, l net.Liste
|
|||
e.config.Datastore.Config.KeyFile = e.runtime.ClientETCDKey
|
||||
|
||||
if err := e.setName(false); err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return l, e.handler(handler), err
|
||||
return e.handler(handler), err
|
||||
}
|
||||
|
||||
// setName sets a unique name for this cluster member. The first time this is called,
|
||||
|
@ -317,10 +316,10 @@ func (e *ETCD) setName(force bool) error {
|
|||
return err
|
||||
}
|
||||
e.name = strings.SplitN(h, ".", 2)[0] + "-" + uuid.New().String()[:8]
|
||||
if err := os.MkdirAll(filepath.Dir(fileName), 0755); err != nil {
|
||||
if err := os.MkdirAll(filepath.Dir(fileName), 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
return ioutil.WriteFile(fileName, []byte(e.name), 0655)
|
||||
return ioutil.WriteFile(fileName, []byte(e.name), 0600)
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -328,7 +327,7 @@ func (e *ETCD) setName(force bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// handler handles request routing for the base http listener
|
||||
// handler wraps the handler with routes for database info
|
||||
func (e *ETCD) handler(next http.Handler) http.Handler {
|
||||
mux := mux.NewRouter()
|
||||
mux.Handle("/db/info", e.infoHandler())
|
||||
|
@ -453,7 +452,7 @@ func (e *ETCD) cluster(ctx context.Context, forceNew bool, options executor.Init
|
|||
ListenMetricsURLs: "http://127.0.0.1:2381",
|
||||
ListenPeerURLs: e.peerURL(),
|
||||
AdvertiseClientURLs: e.clientURL(),
|
||||
DataDir: dataDir(e.config),
|
||||
DataDir: etcdDBDir(e.config),
|
||||
ServerTrust: executor.ServerTrust{
|
||||
CertFile: e.config.Runtime.ServerETCDCert,
|
||||
KeyFile: e.config.Runtime.ServerETCDKey,
|
||||
|
@ -633,7 +632,7 @@ func (e *ETCD) setSnapshotFunction(ctx context.Context) {
|
|||
// completion.
|
||||
func (e *ETCD) Restore(ctx context.Context) error {
|
||||
// check the old etcd data dir
|
||||
oldDataDir := dataDir(e.config) + "-old"
|
||||
oldDataDir := etcdDBDir(e.config) + "-old"
|
||||
if s, err := os.Stat(oldDataDir); err == nil && s.IsDir() {
|
||||
logrus.Infof("Etcd already restored from a snapshot. Restart without --snapshot-restore-path flag. Backup and delete ${datadir}/server/db on each peer etcd server and rejoin the nodes")
|
||||
os.Exit(0)
|
||||
|
@ -646,14 +645,14 @@ func (e *ETCD) Restore(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
// move the data directory to a temp path
|
||||
if err := os.Rename(dataDir(e.config), oldDataDir); err != nil {
|
||||
if err := os.Rename(etcdDBDir(e.config), oldDataDir); err != nil {
|
||||
return err
|
||||
}
|
||||
sManager := snapshot.NewV3(nil)
|
||||
if err := sManager.Restore(snapshot.RestoreConfig{
|
||||
SnapshotPath: e.config.ClusterResetRestorePath,
|
||||
Name: e.name,
|
||||
OutputDataDir: dataDir(e.config),
|
||||
OutputDataDir: etcdDBDir(e.config),
|
||||
OutputWALDir: walDir(e.config),
|
||||
PeerURLs: []string{e.peerURL()},
|
||||
InitialCluster: e.name + "=" + e.peerURL(),
|
||||
|
|
Loading…
Reference in New Issue