mirror of https://github.com/k3s-io/k3s.git
Reorganize Driver interface and etcd driver to avoid passing context and config into most calls
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>pull/8439/head
parent
890645924f
commit
002e6c43ee
|
@ -92,11 +92,11 @@ func save(app *cli.Context, cfg *cmds.Server) error {
|
|||
|
||||
ctx := signals.SetupSignalContext()
|
||||
e := etcd.NewETCD()
|
||||
if err := e.SetControlConfig(ctx, &serverConfig.ControlConfig); err != nil {
|
||||
if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
initialized, err := e.IsInitialized(ctx, &serverConfig.ControlConfig)
|
||||
initialized, err := e.IsInitialized()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -140,7 +140,7 @@ func delete(app *cli.Context, cfg *cmds.Server) error {
|
|||
|
||||
ctx := signals.SetupSignalContext()
|
||||
e := etcd.NewETCD()
|
||||
if err := e.SetControlConfig(ctx, &serverConfig.ControlConfig); err != nil {
|
||||
if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -180,7 +180,7 @@ func list(app *cli.Context, cfg *cmds.Server) error {
|
|||
|
||||
ctx := signals.SetupSignalContext()
|
||||
e := etcd.NewETCD()
|
||||
if err := e.SetControlConfig(ctx, &serverConfig.ControlConfig); err != nil {
|
||||
if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -246,7 +246,7 @@ func prune(app *cli.Context, cfg *cmds.Server) error {
|
|||
|
||||
ctx := signals.SetupSignalContext()
|
||||
e := etcd.NewETCD()
|
||||
if err := e.SetControlConfig(ctx, &serverConfig.ControlConfig); err != nil {
|
||||
if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ func (c *Cluster) shouldBootstrapLoad(ctx context.Context) (bool, bool, error) {
|
|||
if c.managedDB != nil {
|
||||
c.config.Runtime.HTTPBootstrap = true
|
||||
|
||||
isInitialized, err := c.managedDB.IsInitialized(ctx, c.config)
|
||||
isInitialized, err := c.managedDB.IsInitialized()
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
|
@ -430,7 +430,7 @@ func (c *Cluster) Snapshot(ctx context.Context, config *config.Control) error {
|
|||
if c.managedDB == nil {
|
||||
return errors.New("unable to perform etcd snapshot on non-etcd system")
|
||||
}
|
||||
return c.managedDB.Snapshot(ctx, config)
|
||||
return c.managedDB.Snapshot(ctx)
|
||||
}
|
||||
|
||||
// compareConfig verifies that the config of the joining control plane node coincides with the cluster's config
|
||||
|
@ -503,7 +503,7 @@ func (c *Cluster) reconcileEtcd(ctx context.Context) error {
|
|||
}()
|
||||
|
||||
e := etcd.NewETCD()
|
||||
if err := e.SetControlConfig(reconcileCtx, c.config); err != nil {
|
||||
if err := e.SetControlConfig(c.config); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := e.StartEmbeddedTemporary(reconcileCtx); err != nil {
|
||||
|
|
|
@ -15,7 +15,6 @@ import (
|
|||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/etcd"
|
||||
"github.com/k3s-io/k3s/pkg/version"
|
||||
"github.com/rancher/dynamiclistener"
|
||||
"github.com/rancher/dynamiclistener/factory"
|
||||
|
@ -33,9 +32,14 @@ import (
|
|||
// and will sync the certs into the Kubernetes datastore, with a local disk cache.
|
||||
func (c *Cluster) newListener(ctx context.Context) (net.Listener, http.Handler, error) {
|
||||
if c.managedDB != nil {
|
||||
if _, err := os.Stat(etcd.ResetFile(c.config)); err == nil {
|
||||
// delete the dynamic listener file if it exists after restoration to fix restoration
|
||||
// on fresh nodes
|
||||
resetDone, err := c.managedDB.IsReset()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if resetDone {
|
||||
// delete the dynamic listener TLS secret cache after restoring,
|
||||
// to ensure that dynamiclistener doesn't sync the old secret over the top
|
||||
// of whatever was just restored.
|
||||
os.Remove(filepath.Join(c.config.DataDir, "tls/dynamic-cert.json"))
|
||||
}
|
||||
}
|
||||
|
@ -104,8 +108,8 @@ func (c *Cluster) initClusterAndHTTPS(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Config the cluster database and allow it to add additional request handlers
|
||||
handler, err = c.initClusterDB(ctx, handler)
|
||||
// Register database request handlers and controller callbacks
|
||||
handler, err = c.registerDBHandlers(handler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -9,14 +9,12 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/k3s-io/k3s/pkg/cluster/managed"
|
||||
"github.com/k3s-io/k3s/pkg/etcd"
|
||||
"github.com/k3s-io/k3s/pkg/nodepassword"
|
||||
"github.com/k3s-io/k3s/pkg/version"
|
||||
"github.com/k3s-io/kine/pkg/endpoint"
|
||||
"github.com/sirupsen/logrus"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
)
|
||||
|
@ -57,63 +55,62 @@ func (c *Cluster) start(ctx context.Context) error {
|
|||
if c.managedDB == nil {
|
||||
return nil
|
||||
}
|
||||
resetFile := etcd.ResetFile(c.config)
|
||||
rebootstrap := func() error {
|
||||
return c.storageBootstrap(ctx)
|
||||
}
|
||||
|
||||
resetDone, err := c.managedDB.IsReset()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.config.ClusterReset {
|
||||
// If we're restoring from a snapshot, don't check the reset-flag - just reset and restore.
|
||||
if c.config.ClusterResetRestorePath != "" {
|
||||
return c.managedDB.Reset(ctx, rebootstrap)
|
||||
}
|
||||
|
||||
// If the reset-flag doesn't exist, reset. This will create the reset-flag if it succeeds.
|
||||
if _, err := os.Stat(resetFile); err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
if !resetDone {
|
||||
return c.managedDB.Reset(ctx, rebootstrap)
|
||||
}
|
||||
|
||||
// The reset-flag exists, ask the user to remove it if they want to reset again.
|
||||
return fmt.Errorf("Managed etcd cluster membership was previously reset, please remove the cluster-reset flag and start %s normally. If you need to perform another cluster reset, you must first manually delete the %s file", version.Program, resetFile)
|
||||
return fmt.Errorf("Managed etcd cluster membership was previously reset, please remove the cluster-reset flag and start %s normally. "+
|
||||
"If you need to perform another cluster reset, you must first manually delete the file at %s", version.Program, c.managedDB.ResetFile())
|
||||
}
|
||||
|
||||
// The reset-flag exists but we're not resetting; remove it
|
||||
if _, err := os.Stat(resetFile); err == nil {
|
||||
// Before removing reset file we need to delete the node passwd secret in case the node
|
||||
if resetDone {
|
||||
// If the cluster was reset, we need to delete the node passwd secret in case the node
|
||||
// password from the previously restored snapshot differs from the current password on disk.
|
||||
c.config.Runtime.ClusterControllerStarts["node-password-secret-cleanup"] = c.deleteNodePasswdSecret
|
||||
os.Remove(resetFile)
|
||||
}
|
||||
|
||||
// Starting the managed database will clear the reset-flag if set
|
||||
return c.managedDB.Start(ctx, c.clientAccessInfo)
|
||||
}
|
||||
|
||||
// initClusterDB registers routes for database info with the http request handler
|
||||
func (c *Cluster) initClusterDB(ctx context.Context, handler http.Handler) (http.Handler, error) {
|
||||
// registerDBHandlers registers routes for database info with the http request handler
|
||||
func (c *Cluster) registerDBHandlers(handler http.Handler) (http.Handler, error) {
|
||||
if c.managedDB == nil {
|
||||
return handler, nil
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(c.config.Datastore.Endpoint, c.managedDB.EndpointName()+"://") {
|
||||
c.config.Datastore = endpoint.Config{
|
||||
Endpoint: c.managedDB.EndpointName(),
|
||||
}
|
||||
}
|
||||
|
||||
return c.managedDB.Register(ctx, c.config, handler)
|
||||
return c.managedDB.Register(handler)
|
||||
}
|
||||
|
||||
// assignManagedDriver assigns a driver based on a number of different configuration variables.
|
||||
// If a driver has been initialized it is used.
|
||||
// If the configured endpoint matches the name of a driver, that driver is used.
|
||||
// If no specific endpoint has been requested and creating or joining has been requested,
|
||||
// we use the default driver.
|
||||
// If none of the above are true, no managed driver is assigned.
|
||||
func (c *Cluster) assignManagedDriver(ctx context.Context) error {
|
||||
// Check all managed drivers for an initialized database on disk; use one if found
|
||||
for _, driver := range managed.Registered() {
|
||||
if ok, err := driver.IsInitialized(ctx, c.config); err != nil {
|
||||
if err := driver.SetControlConfig(c.config); err != nil {
|
||||
return err
|
||||
}
|
||||
if ok, err := driver.IsInitialized(); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
c.managedDB = driver
|
||||
|
@ -121,24 +118,9 @@ func (c *Cluster) assignManagedDriver(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
// This is needed to allow downstreams to override driver selection logic by
|
||||
// setting ServerConfig.Datastore.Endpoint such that it will match a driver's EndpointName
|
||||
endpointType := strings.SplitN(c.config.Datastore.Endpoint, ":", 2)[0]
|
||||
for _, driver := range managed.Registered() {
|
||||
if endpointType == driver.EndpointName() {
|
||||
c.managedDB = driver
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// If we have been asked to initialize or join a cluster, do so using the default managed database.
|
||||
if c.config.Datastore.Endpoint == "" && (c.config.ClusterInit || (c.config.Token != "" && c.config.JoinURL != "")) {
|
||||
for _, driver := range managed.Registered() {
|
||||
if driver.EndpointName() == managed.Default() {
|
||||
c.managedDB = driver
|
||||
return nil
|
||||
}
|
||||
}
|
||||
c.managedDB = managed.Default()
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -9,19 +9,21 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
defaultDriver string
|
||||
drivers []Driver
|
||||
drivers []Driver
|
||||
)
|
||||
|
||||
type Driver interface {
|
||||
IsInitialized(ctx context.Context, config *config.Control) (bool, error)
|
||||
Register(ctx context.Context, config *config.Control, handler http.Handler) (http.Handler, error)
|
||||
SetControlConfig(config *config.Control) error
|
||||
IsInitialized() (bool, error)
|
||||
Register(handler http.Handler) (http.Handler, error)
|
||||
Reset(ctx context.Context, reboostrap func() error) error
|
||||
IsReset() (bool, error)
|
||||
ResetFile() string
|
||||
Start(ctx context.Context, clientAccessInfo *clientaccess.Info) error
|
||||
Test(ctx context.Context) error
|
||||
Restore(ctx context.Context) error
|
||||
EndpointName() string
|
||||
Snapshot(ctx context.Context, config *config.Control) error
|
||||
Snapshot(ctx context.Context) error
|
||||
ReconcileSnapshotData(ctx context.Context) error
|
||||
GetMembersClientURLs(ctx context.Context) ([]string, error)
|
||||
RemoveSelf(ctx context.Context) error
|
||||
|
@ -35,9 +37,6 @@ func Registered() []Driver {
|
|||
return drivers
|
||||
}
|
||||
|
||||
func Default() string {
|
||||
if defaultDriver == "" && len(drivers) == 1 {
|
||||
return drivers[0].EndpointName()
|
||||
}
|
||||
return defaultDriver
|
||||
func Default() Driver {
|
||||
return drivers[0]
|
||||
}
|
||||
|
|
218
pkg/etcd/etcd.go
218
pkg/etcd/etcd.go
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/google/uuid"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/k3s-io/k3s/pkg/clientaccess"
|
||||
"github.com/k3s-io/k3s/pkg/cluster/managed"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/control/deps"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/executor"
|
||||
|
@ -107,6 +108,9 @@ var (
|
|||
|
||||
type NodeControllerGetter func() controllerv1.NodeController
|
||||
|
||||
// explicit interface check
|
||||
var _ managed.Driver = &ETCD{}
|
||||
|
||||
type ETCD struct {
|
||||
client *clientv3.Client
|
||||
config *config.Control
|
||||
|
@ -163,22 +167,16 @@ func (e *ETCD) EndpointName() string {
|
|||
return "etcd"
|
||||
}
|
||||
|
||||
// SetControlConfig sets the given config on the etcd struct.
|
||||
func (e *ETCD) SetControlConfig(ctx context.Context, config *config.Control) error {
|
||||
// SetControlConfig passes the cluster config into the etcd datastore. This is necessary
|
||||
// because the config may not yet be fully built at the time the Driver instance is registered.
|
||||
func (e *ETCD) SetControlConfig(config *config.Control) error {
|
||||
if e.config != nil {
|
||||
return errors.New("control config already set")
|
||||
}
|
||||
|
||||
e.config = config
|
||||
|
||||
client, err := getClient(ctx, e.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.client = client
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
e.client.Close()
|
||||
}()
|
||||
|
||||
address, err := getAdvertiseAddress(config.PrivateIP)
|
||||
address, err := getAdvertiseAddress(e.config.PrivateIP)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -192,6 +190,13 @@ func (e *ETCD) SetControlConfig(ctx context.Context, config *config.Control) err
|
|||
// If it is still a learner or not a part of the cluster, an error is raised.
|
||||
// If it cannot be defragmented or has any alarms that cannot be disarmed, an error is raised.
|
||||
func (e *ETCD) Test(ctx context.Context) error {
|
||||
if e.config == nil {
|
||||
return errors.New("control config not set")
|
||||
}
|
||||
if e.client == nil {
|
||||
return errors.New("etcd datastore is not started")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, testTimeout)
|
||||
defer cancel()
|
||||
|
||||
|
@ -247,7 +252,7 @@ func dbDir(config *config.Control) string {
|
|||
return filepath.Join(config.DataDir, "db", "etcd")
|
||||
}
|
||||
|
||||
// walDir returns the path to etcddbDir/member/wal
|
||||
// walDir returns the path to etcdDBDir/member/wal
|
||||
func walDir(config *config.Control) string {
|
||||
return filepath.Join(dbDir(config), "member", "wal")
|
||||
}
|
||||
|
@ -256,20 +261,50 @@ func sqliteFile(config *config.Control) string {
|
|||
return filepath.Join(config.DataDir, "db", "state.db")
|
||||
}
|
||||
|
||||
// nameFile returns the path to etcddbDir/name.
|
||||
// nameFile returns the path to etcdDBDir/name.
|
||||
func nameFile(config *config.Control) string {
|
||||
return filepath.Join(dbDir(config), "name")
|
||||
}
|
||||
|
||||
// ResetFile returns the path to etcddbDir/reset-flag.
|
||||
func ResetFile(config *config.Control) string {
|
||||
return filepath.Join(config.DataDir, "db", "reset-flag")
|
||||
// clearReset removes the reset file
|
||||
func (e *ETCD) clearReset() error {
|
||||
if err := os.Remove(e.ResetFile()); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsReset checks to see if the reset file exists, indicating that a cluster-reset has been completed successfully.
|
||||
func (e *ETCD) IsReset() (bool, error) {
|
||||
if e.config == nil {
|
||||
return false, errors.New("control config not set")
|
||||
}
|
||||
|
||||
if _, err := os.Stat(e.ResetFile()); err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return false, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// ResetFile returns the path to etcdDBDir/reset-flag.
|
||||
func (e *ETCD) ResetFile() string {
|
||||
if e.config == nil {
|
||||
panic("control config not set")
|
||||
}
|
||||
return filepath.Join(e.config.DataDir, "db", "reset-flag")
|
||||
}
|
||||
|
||||
// IsInitialized checks to see if a WAL directory exists. If so, we assume that etcd
|
||||
// has already been brought up at least once.
|
||||
func (e *ETCD) IsInitialized(ctx context.Context, config *config.Control) (bool, error) {
|
||||
dir := walDir(config)
|
||||
func (e *ETCD) IsInitialized() (bool, error) {
|
||||
if e.config == nil {
|
||||
return false, errors.New("control config not set")
|
||||
}
|
||||
|
||||
dir := walDir(e.config)
|
||||
if s, err := os.Stat(dir); err == nil && s.IsDir() {
|
||||
return true, nil
|
||||
} else if os.IsNotExist(err) {
|
||||
|
@ -287,12 +322,13 @@ func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error {
|
|||
t := time.NewTicker(5 * time.Second)
|
||||
defer t.Stop()
|
||||
for range t.C {
|
||||
// resetting the apiaddresses to nil since we are doing a restoration
|
||||
if _, err := e.client.Put(ctx, AddressKey, ""); err != nil {
|
||||
logrus.Warnf("failed to reset api addresses key in etcd: %v", err)
|
||||
continue
|
||||
}
|
||||
if err := e.Test(ctx); err == nil {
|
||||
// reset the apiaddresses to nil since we are doing a restoration
|
||||
if _, err := e.client.Put(ctx, AddressKey, ""); err != nil {
|
||||
logrus.Warnf("failed to reset api addresses key in etcd: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
members, err := e.client.MemberList(ctx)
|
||||
if err != nil {
|
||||
continue
|
||||
|
@ -338,6 +374,10 @@ func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error {
|
|||
}
|
||||
}()
|
||||
|
||||
if err := e.startClient(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If asked to restore from a snapshot, do so
|
||||
if e.config.ClusterResetRestorePath != "" {
|
||||
if e.config.EtcdS3 {
|
||||
|
@ -367,7 +407,7 @@ func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error {
|
|||
return err
|
||||
}
|
||||
// touch a file to avoid multiple resets
|
||||
if err := os.WriteFile(ResetFile(e.config), []byte{}, 0600); err != nil {
|
||||
if err := os.WriteFile(e.ResetFile(), []byte{}, 0600); err != nil {
|
||||
return err
|
||||
}
|
||||
return e.newCluster(ctx, true)
|
||||
|
@ -375,9 +415,13 @@ func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error {
|
|||
|
||||
// Start starts the datastore
|
||||
func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
|
||||
isInitialized, err := e.IsInitialized(ctx, e.config)
|
||||
isInitialized, err := e.IsInitialized()
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "configuration validation failed")
|
||||
return errors.Wrapf(err, "failed to check for initialized etcd datastore")
|
||||
}
|
||||
|
||||
if err := e.startClient(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !e.config.EtcdDisableSnapshots {
|
||||
|
@ -440,6 +484,35 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
|
|||
return nil
|
||||
}
|
||||
|
||||
// startClient sets up the config's datastore endpoints, and starts an etcd client connected to the server endpoint.
|
||||
// The client is destroyed when the context is closed.
|
||||
func (e *ETCD) startClient(ctx context.Context) error {
|
||||
if e.client != nil {
|
||||
return errors.New("etcd datastore already started")
|
||||
}
|
||||
|
||||
endpoints := getEndpoints(e.config)
|
||||
e.config.Datastore.Endpoint = endpoints[0]
|
||||
e.config.Datastore.BackendTLSConfig.CAFile = e.config.Runtime.ETCDServerCA
|
||||
e.config.Datastore.BackendTLSConfig.CertFile = e.config.Runtime.ClientETCDCert
|
||||
e.config.Datastore.BackendTLSConfig.KeyFile = e.config.Runtime.ClientETCDKey
|
||||
|
||||
client, err := getClient(ctx, e.config, endpoints...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.client = client
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
client := e.client
|
||||
e.client = nil
|
||||
client.Close()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// join attempts to add a member to an existing cluster
|
||||
func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
|
||||
clientCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
|
||||
|
@ -516,33 +589,8 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er
|
|||
})
|
||||
}
|
||||
|
||||
// Register configures a new etcd client and adds db info routes for the http request handler.
|
||||
func (e *ETCD) Register(ctx context.Context, config *config.Control, handler http.Handler) (http.Handler, error) {
|
||||
e.config = config
|
||||
|
||||
client, err := getClient(ctx, e.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.client = client
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
e.client.Close()
|
||||
}()
|
||||
|
||||
address, err := getAdvertiseAddress(config.PrivateIP)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.address = address
|
||||
|
||||
endpoints := getEndpoints(config)
|
||||
e.config.Datastore.Endpoint = endpoints[0]
|
||||
e.config.Datastore.BackendTLSConfig.CAFile = e.config.Runtime.ETCDServerCA
|
||||
e.config.Datastore.BackendTLSConfig.CertFile = e.config.Runtime.ClientETCDCert
|
||||
e.config.Datastore.BackendTLSConfig.KeyFile = e.config.Runtime.ClientETCDKey
|
||||
|
||||
// Register adds db info routes for the http request handler, and registers cluster controller callbacks
|
||||
func (e *ETCD) Register(handler http.Handler) (http.Handler, error) {
|
||||
e.config.Runtime.ClusterControllerStarts["etcd-node-metadata"] = func(ctx context.Context) {
|
||||
registerMetadataHandlers(ctx, e)
|
||||
}
|
||||
|
@ -659,7 +707,6 @@ func getClientConfig(ctx context.Context, control *config.Control, endpoints ...
|
|||
DialTimeout: defaultDialTimeout,
|
||||
DialKeepAliveTime: defaultKeepAliveTime,
|
||||
DialKeepAliveTimeout: defaultKeepAliveTimeout,
|
||||
AutoSyncInterval: defaultKeepAliveTimeout,
|
||||
PermitWithoutStream: true,
|
||||
}
|
||||
|
||||
|
@ -888,6 +935,23 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error {
|
|||
}
|
||||
}()
|
||||
|
||||
if e.client != nil {
|
||||
return errors.New("etcd datastore already started")
|
||||
}
|
||||
|
||||
client, err := getClient(ctx, e.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.client = client
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
client := e.client
|
||||
e.client = nil
|
||||
client.Close()
|
||||
}()
|
||||
|
||||
if err := cp.Copy(etcdDataDir, tmpDataDir, cp.Options{PreserveOwner: true}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1214,25 +1278,10 @@ func snapshotDir(config *config.Control, create bool) (string, error) {
|
|||
// preSnapshotSetup checks to see if the necessary components are in place
|
||||
// to perform an Etcd snapshot. This is necessary primarily for on-demand
|
||||
// snapshots since they're performed before normal Etcd setup is completed.
|
||||
func (e *ETCD) preSnapshotSetup(ctx context.Context, config *config.Control) error {
|
||||
func (e *ETCD) preSnapshotSetup(ctx context.Context) error {
|
||||
if e.snapshotSem == nil {
|
||||
e.snapshotSem = semaphore.NewWeighted(maxConcurrentSnapshots)
|
||||
}
|
||||
if e.client == nil {
|
||||
if e.config == nil {
|
||||
e.config = config
|
||||
}
|
||||
client, err := getClient(ctx, e.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.client = client
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
e.client.Close()
|
||||
}()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1324,8 +1373,8 @@ func (e *ETCD) decompressSnapshot(snapshotDir, snapshotFile string) (string, err
|
|||
// Snapshot attempts to save a new snapshot to the configured directory, and then clean up any old and failed
|
||||
// snapshots in excess of the retention limits. This method is used in the internal cron snapshot
|
||||
// system as well as used to do on-demand snapshots.
|
||||
func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
||||
if err := e.preSnapshotSetup(ctx, config); err != nil {
|
||||
func (e *ETCD) Snapshot(ctx context.Context) error {
|
||||
if err := e.preSnapshotSetup(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if !e.snapshotSem.TryAcquire(maxConcurrentSnapshots) {
|
||||
|
@ -1353,7 +1402,22 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
|||
}
|
||||
|
||||
endpoints := getEndpoints(e.config)
|
||||
status, err := e.client.Status(ctx, endpoints[0])
|
||||
var client *clientv3.Client
|
||||
var err error
|
||||
|
||||
// Use the internal client if possible, or create a new one
|
||||
// if run from the CLI.
|
||||
if e.client != nil {
|
||||
client = e.client
|
||||
} else {
|
||||
client, err = getClient(ctx, e.config, endpoints...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer client.Close()
|
||||
}
|
||||
|
||||
status, err := client.Status(ctx, endpoints[0])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to check etcd status for snapshot")
|
||||
}
|
||||
|
@ -1979,7 +2043,7 @@ func (e *ETCD) setSnapshotFunction(ctx context.Context) {
|
|||
// having all the nodes take a snapshot at the exact same time can lead to excessive retry thrashing
|
||||
// when updating the snapshot list configmap.
|
||||
time.Sleep(time.Duration(rand.Float64() * float64(snapshotJitterMax)))
|
||||
if err := e.Snapshot(ctx, e.config); err != nil {
|
||||
if err := e.Snapshot(ctx); err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
})))
|
||||
|
|
|
@ -118,7 +118,11 @@ func Test_UnitETCD_IsInitialized(t *testing.T) {
|
|||
t.Errorf("Prep for ETCD.IsInitialized() failed = %v", err)
|
||||
return
|
||||
}
|
||||
got, err := e.IsInitialized(tt.args.ctx, tt.args.config)
|
||||
if err := e.SetControlConfig(tt.args.config); err != nil {
|
||||
t.Errorf("ETCD.SetControlConfig() failed= %v", err)
|
||||
return
|
||||
}
|
||||
got, err := e.IsInitialized()
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("ETCD.IsInitialized() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
|
@ -196,7 +200,11 @@ func Test_UnitETCD_Register(t *testing.T) {
|
|||
t.Errorf("Setup for ETCD.Register() failed = %v", err)
|
||||
return
|
||||
}
|
||||
_, err := e.Register(tt.args.ctx, tt.args.config, tt.args.handler)
|
||||
if err := e.SetControlConfig(tt.args.config); err != nil {
|
||||
t.Errorf("ETCD.SetControlConfig() failed = %v", err)
|
||||
return
|
||||
}
|
||||
_, err := e.Register(tt.args.handler)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("ETCD.Register() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
|
@ -244,17 +252,13 @@ func Test_UnitETCD_Start(t *testing.T) {
|
|||
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
|
||||
e.config.EtcdDisableSnapshots = true
|
||||
testutil.GenerateRuntime(e.config)
|
||||
client, err := getClient(ctxInfo.ctx, e.config)
|
||||
e.client = client
|
||||
|
||||
return err
|
||||
return nil
|
||||
},
|
||||
teardown: func(e *ETCD, ctxInfo *contextInfo) error {
|
||||
// RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes
|
||||
if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() {
|
||||
return err
|
||||
}
|
||||
e.client.Close()
|
||||
ctxInfo.cancel()
|
||||
time.Sleep(10 * time.Second)
|
||||
testutil.CleanupDataDir(e.config)
|
||||
|
@ -275,17 +279,13 @@ func Test_UnitETCD_Start(t *testing.T) {
|
|||
setup: func(e *ETCD, ctxInfo *contextInfo) error {
|
||||
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
|
||||
testutil.GenerateRuntime(e.config)
|
||||
client, err := getClient(ctxInfo.ctx, e.config)
|
||||
e.client = client
|
||||
|
||||
return err
|
||||
return nil
|
||||
},
|
||||
teardown: func(e *ETCD, ctxInfo *contextInfo) error {
|
||||
// RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes
|
||||
if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() {
|
||||
return err
|
||||
}
|
||||
e.client.Close()
|
||||
ctxInfo.cancel()
|
||||
time.Sleep(5 * time.Second)
|
||||
testutil.CleanupDataDir(e.config)
|
||||
|
@ -308,11 +308,6 @@ func Test_UnitETCD_Start(t *testing.T) {
|
|||
if err := testutil.GenerateRuntime(e.config); err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := getClient(ctxInfo.ctx, e.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.client = client
|
||||
return os.MkdirAll(walDir(e.config), 0700)
|
||||
},
|
||||
teardown: func(e *ETCD, ctxInfo *contextInfo) error {
|
||||
|
@ -320,7 +315,6 @@ func Test_UnitETCD_Start(t *testing.T) {
|
|||
if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() {
|
||||
return err
|
||||
}
|
||||
e.client.Close()
|
||||
ctxInfo.cancel()
|
||||
time.Sleep(5 * time.Second)
|
||||
testutil.CleanupDataDir(e.config)
|
||||
|
|
Loading…
Reference in New Issue