Add etcd s3 config secret implementation

* Move snapshot structs and functions into pkg/etcd/snapshot
* Move s3 client code and functions into pkg/etcd/s3
* Refactor pkg/etcd to track snapshot and s3 moves
* Add support for reading s3 client config from secret
* Add minio client cache, since S3 client configuration can now be
  changed at runtime by modifying the secret, and don't want to have to
  create a new minio client every time we read config.
* Add tests for pkg/etcd/s3

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit c36db53e54)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/10542/head
Brad Davidson 2024-06-11 00:29:17 +00:00 committed by Brad Davidson
parent 30f2267bd5
commit fbde4f7812
19 changed files with 3105 additions and 985 deletions

View File

@ -0,0 +1,83 @@
# Support etcd Snapshot Configuration via Kubernetes Secret
Date: 2024-02-06
Revised: 2024-06-10
## Status
Accepted
## Context
### Current State
K3s currently reads configuration for S3 storage of etcd snapshots from CLI flags and/or configuration files.
Security-conscious users have raised issue with the current state. They want to store snapshots on S3, but do not want
to have credentials visible in config files or systemd units. Users operating in highly secure environments have also
asked for the ability to configure a proxy server to be used when creating/restoring snapshots stored on S3, without
managing complicated `NO_PROXY` settings or affecting the rest of the K3s process environment.
### Security Considerations
Storing credentials on-disk is generally considered a bad idea, and is not allowed by security practices in many
organizations. Use of static credentials in the config file also makes them difficult to rotate, as K3s only reloads the
configuration on startup.
### Existing Work
Cloud-providers and other tools that need to auth to external systems frequently can be configured to retrieve secrets
from an existing credential secret that is provisioned via an external process, such as a secrets management tool. This
avoids embedding the credentials directly in the system configuration, chart values, and so on.
## Decision
* We will add a `--etcd-s3-proxy` flag that can be used to set the proxy used by the S3 client. This will override the
settings that golang's default HTTP client reads from the `HTTP_PROXY/HTTPS_PROXY/NO_PROXY` environment varibles.
* We will add support for reading etcd snapshot S3 configuration from a Secret. The secret name will be specified via a new
`--etcd-s3-config-secret` flag, which accepts the name of the Secret in the `kube-system` namespace.
* Presence of the `--etcd-s3-config-secret` flag does not imply `--etcd-s3`. If S3 is not enabled by use of the `--etcd-s3` flag,
the Secret will not be used.
* The Secret does not need to exist when K3s starts; it will be checked for every time a snapshot operation is performed.
* Secret and CLI/config values will NOT be merged. The Secret will provide values to be used in absence of other
configuration; if S3 configuration is passed via CLI flags or configuration file, ALL fields set by the Secret
will be ignored.
* The Secret will ONLY be used for on-demand and scheduled snapshot save operations; it will not be used by snapshot restore.
Snapshot restore operations that want to retrieve a snapshot from S3 will need to pass the appropriate configuration
via environment variables or CLI flags, as the Secret is not available during the restore process.
Fields within the Secret will match `k3s server` CLI flags / config file keys. For the `etcd-s3-endpoint-ca`, which
normally contains the path of a file on disk, the `etcd-s3-endpoint-ca` field can specify an inline PEM-encoded CA
bundle, or the `etcd-s3-endpoint-ca-name` can be used to specify the name of a ConfigMap in the `kube-system` namespace
containing one or more CA bundles. All valid CA bundles found in either field are loaded.
```yaml
apiVersion: v1
kind: Secret
metadata:
name: k3s-etcd-snapshot-s3-config
namespace: kube-system
stringData:
etcd-s3-endpoint: ""
etcd-s3-endpoint-ca: ""
etcd-s3-endpoint-ca-name: ""
etcd-s3-skip-ssl-verify: "false"
etcd-s3-access-key: "AWS_ACCESS_KEY_ID"
etcd-s3-secret-key: "AWS_SECRET_ACCESS_KEY"
etcd-s3-bucket: "bucket"
etcd-s3-folder: "folder"
etcd-s3-region: "us-east-1"
etcd-s3-insecure: "false"
etcd-s3-timeout: "5m"
etcd-s3-proxy: ""
```
## Consequences
This will require additional documentation, tests, and QA work to validate use of secrets for s3 snapshot configuration.
## Revisions
#### 2024-06-10:
* Changed flag to `etcd-s3-config-secret` to avoid confusion with `etcd-s3-secret-key`.
* Added `etcd-s3-folder` to example Secret.

6
go.mod
View File

@ -149,11 +149,9 @@ require (
github.com/vishvananda/netlink v1.2.1-beta.2
github.com/yl2chen/cidranger v1.0.2
go.etcd.io/etcd/api/v3 v3.5.13
go.etcd.io/etcd/client/pkg/v3 v3.5.13
go.etcd.io/etcd/client/v3 v3.5.13
go.etcd.io/etcd/etcdutl/v3 v3.5.9
go.etcd.io/etcd/etcdutl/v3 v3.5.13
go.etcd.io/etcd/server/v3 v3.5.13
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.23.0
golang.org/x/net v0.25.0
golang.org/x/sync v0.7.0
@ -443,6 +441,7 @@ require (
github.com/xlab/treeprint v1.1.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.etcd.io/bbolt v1.3.9 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect
go.etcd.io/etcd/client/v2 v2.305.13 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.13 // indirect
go.etcd.io/etcd/raft/v3 v3.5.13 // indirect
@ -464,6 +463,7 @@ require (
go.uber.org/fx v1.20.1 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
golang.org/x/mod v0.15.0 // indirect
golang.org/x/oauth2 v0.17.0 // indirect

View File

@ -100,6 +100,16 @@ var EtcdSnapshotFlags = []cli.Flag{
Usage: "(db) S3 folder",
Destination: &ServerConfig.EtcdS3Folder,
},
&cli.StringFlag{
Name: "s3-proxy,etcd-s3-proxy",
Usage: "(db) Proxy server to use when connecting to S3, overriding any proxy-releated environment variables",
Destination: &ServerConfig.EtcdS3Proxy,
},
&cli.StringFlag{
Name: "s3-config-secret,etcd-s3-config-secret",
Usage: "(db) Name of secret in the kube-system namespace used to configure S3, if etcd-s3 is enabled and no other etcd-s3 options are set",
Destination: &ServerConfig.EtcdS3ConfigSecret,
},
&cli.BoolFlag{
Name: "s3-insecure,etcd-s3-insecure",
Usage: "(db) Disables S3 over HTTPS",

View File

@ -105,6 +105,8 @@ type Server struct {
EtcdS3BucketName string
EtcdS3Region string
EtcdS3Folder string
EtcdS3Proxy string
EtcdS3ConfigSecret string
EtcdS3Timeout time.Duration
EtcdS3Insecure bool
ServiceLBNamespace string
@ -436,6 +438,16 @@ var ServerFlags = []cli.Flag{
Usage: "(db) S3 folder",
Destination: &ServerConfig.EtcdS3Folder,
},
&cli.StringFlag{
Name: "etcd-s3-proxy",
Usage: "(db) Proxy server to use when connecting to S3, overriding any proxy-releated environment variables",
Destination: &ServerConfig.EtcdS3Proxy,
},
&cli.StringFlag{
Name: "etcd-s3-config-secret",
Usage: "(db) Name of secret in the kube-system namespace used to configure S3, if etcd-s3 is enabled and no other etcd-s3 options are set",
Destination: &ServerConfig.EtcdS3ConfigSecret,
},
&cli.BoolFlag{
Name: "etcd-s3-insecure",
Usage: "(db) Disables S3 over HTTPS",

View File

@ -16,6 +16,7 @@ import (
"github.com/k3s-io/k3s/pkg/cli/cmds"
"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/cluster/managed"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/etcd"
"github.com/k3s-io/k3s/pkg/proctitle"
"github.com/k3s-io/k3s/pkg/server"
@ -50,17 +51,20 @@ func commandSetup(app *cli.Context, cfg *cmds.Server) (*etcd.SnapshotRequest, *c
}
if cfg.EtcdS3 {
sr.S3 = &etcd.SnapshotRequestS3{}
sr.S3.AccessKey = cfg.EtcdS3AccessKey
sr.S3.Bucket = cfg.EtcdS3BucketName
sr.S3.Endpoint = cfg.EtcdS3Endpoint
sr.S3.EndpointCA = cfg.EtcdS3EndpointCA
sr.S3.Folder = cfg.EtcdS3Folder
sr.S3.Insecure = cfg.EtcdS3Insecure
sr.S3.Region = cfg.EtcdS3Region
sr.S3.SecretKey = cfg.EtcdS3SecretKey
sr.S3.SkipSSLVerify = cfg.EtcdS3SkipSSLVerify
sr.S3.Timeout = metav1.Duration{Duration: cfg.EtcdS3Timeout}
sr.S3 = &config.EtcdS3{
AccessKey: cfg.EtcdS3AccessKey,
Bucket: cfg.EtcdS3BucketName,
ConfigSecret: cfg.EtcdS3ConfigSecret,
Endpoint: cfg.EtcdS3Endpoint,
EndpointCA: cfg.EtcdS3EndpointCA,
Folder: cfg.EtcdS3Folder,
Insecure: cfg.EtcdS3Insecure,
Proxy: cfg.EtcdS3Proxy,
Region: cfg.EtcdS3Region,
SecretKey: cfg.EtcdS3SecretKey,
SkipSSLVerify: cfg.EtcdS3SkipSSLVerify,
Timeout: metav1.Duration{Duration: cfg.EtcdS3Timeout},
}
// extend request timeout to allow the S3 operation to complete
timeout += cfg.EtcdS3Timeout
}

View File

@ -32,6 +32,7 @@ import (
"github.com/rancher/wrangler/pkg/signals"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilnet "k8s.io/apimachinery/pkg/util/net"
kubeapiserverflag "k8s.io/component-base/cli/flag"
"k8s.io/kubernetes/pkg/controlplane"
@ -185,17 +186,22 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
serverConfig.ControlConfig.EtcdSnapshotCron = cfg.EtcdSnapshotCron
serverConfig.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir
serverConfig.ControlConfig.EtcdSnapshotRetention = cfg.EtcdSnapshotRetention
serverConfig.ControlConfig.EtcdS3 = cfg.EtcdS3
serverConfig.ControlConfig.EtcdS3Endpoint = cfg.EtcdS3Endpoint
serverConfig.ControlConfig.EtcdS3EndpointCA = cfg.EtcdS3EndpointCA
serverConfig.ControlConfig.EtcdS3SkipSSLVerify = cfg.EtcdS3SkipSSLVerify
serverConfig.ControlConfig.EtcdS3AccessKey = cfg.EtcdS3AccessKey
serverConfig.ControlConfig.EtcdS3SecretKey = cfg.EtcdS3SecretKey
serverConfig.ControlConfig.EtcdS3BucketName = cfg.EtcdS3BucketName
serverConfig.ControlConfig.EtcdS3Region = cfg.EtcdS3Region
serverConfig.ControlConfig.EtcdS3Folder = cfg.EtcdS3Folder
serverConfig.ControlConfig.EtcdS3Insecure = cfg.EtcdS3Insecure
serverConfig.ControlConfig.EtcdS3Timeout = cfg.EtcdS3Timeout
if cfg.EtcdS3 {
serverConfig.ControlConfig.EtcdS3 = &config.EtcdS3{
AccessKey: cfg.EtcdS3AccessKey,
Bucket: cfg.EtcdS3BucketName,
ConfigSecret: cfg.EtcdS3ConfigSecret,
Endpoint: cfg.EtcdS3Endpoint,
EndpointCA: cfg.EtcdS3EndpointCA,
Folder: cfg.EtcdS3Folder,
Insecure: cfg.EtcdS3Insecure,
Proxy: cfg.EtcdS3Proxy,
Region: cfg.EtcdS3Region,
SecretKey: cfg.EtcdS3SecretKey,
SkipSSLVerify: cfg.EtcdS3SkipSSLVerify,
Timeout: metav1.Duration{Duration: cfg.EtcdS3Timeout},
}
}
} else {
logrus.Info("ETCD snapshots are disabled")
}

View File

@ -8,13 +8,13 @@ import (
"sort"
"strings"
"sync"
"time"
"github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io"
"github.com/k3s-io/kine/pkg/endpoint"
"github.com/rancher/wharfie/pkg/registries"
"github.com/rancher/wrangler/pkg/generated/controllers/core"
"github.com/rancher/wrangler/pkg/leader"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/client-go/tools/record"
@ -64,6 +64,21 @@ type Node struct {
DefaultRuntime string
}
type EtcdS3 struct {
AccessKey string `json:"accessKey,omitempty"`
Bucket string `json:"bucket,omitempty"`
ConfigSecret string `json:"configSecret,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
EndpointCA string `json:"endpointCA,omitempty"`
Folder string `json:"folder,omitempty"`
Proxy string `json:"proxy,omitempty"`
Region string `json:"region,omitempty"`
SecretKey string `json:"secretKey,omitempty"`
Insecure bool `json:"insecure,omitempty"`
SkipSSLVerify bool `json:"skipSSLVerify,omitempty"`
Timeout metav1.Duration `json:"timeout,omitempty"`
}
type Containerd struct {
Address string
Log string
@ -215,27 +230,17 @@ type Control struct {
EncryptSkip bool
MinTLSVersion string
CipherSuites []string
TLSMinVersion uint16 `json:"-"`
TLSCipherSuites []uint16 `json:"-"`
EtcdSnapshotName string `json:"-"`
EtcdDisableSnapshots bool `json:"-"`
EtcdExposeMetrics bool `json:"-"`
EtcdSnapshotDir string `json:"-"`
EtcdSnapshotCron string `json:"-"`
EtcdSnapshotRetention int `json:"-"`
EtcdSnapshotCompress bool `json:"-"`
EtcdListFormat string `json:"-"`
EtcdS3 bool `json:"-"`
EtcdS3Endpoint string `json:"-"`
EtcdS3EndpointCA string `json:"-"`
EtcdS3SkipSSLVerify bool `json:"-"`
EtcdS3AccessKey string `json:"-"`
EtcdS3SecretKey string `json:"-"`
EtcdS3BucketName string `json:"-"`
EtcdS3Region string `json:"-"`
EtcdS3Folder string `json:"-"`
EtcdS3Timeout time.Duration `json:"-"`
EtcdS3Insecure bool `json:"-"`
TLSMinVersion uint16 `json:"-"`
TLSCipherSuites []uint16 `json:"-"`
EtcdSnapshotName string `json:"-"`
EtcdDisableSnapshots bool `json:"-"`
EtcdExposeMetrics bool `json:"-"`
EtcdSnapshotDir string `json:"-"`
EtcdSnapshotCron string `json:"-"`
EtcdSnapshotRetention int `json:"-"`
EtcdSnapshotCompress bool `json:"-"`
EtcdListFormat string `json:"-"`
EtcdS3 *EtcdS3 `json:"-"`
ServerNodeName string
BindAddress string

View File

@ -12,7 +12,6 @@ import (
"net/url"
"os"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
@ -26,6 +25,8 @@ import (
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/control/deps"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/etcd/s3"
"github.com/k3s-io/k3s/pkg/etcd/snapshot"
"github.com/k3s-io/k3s/pkg/server/auth"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
@ -40,10 +41,8 @@ import (
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/etcdutl/v3/snapshot"
"go.uber.org/zap"
snapshotv3 "go.etcd.io/etcd/etcdutl/v3/snapshot"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -93,8 +92,6 @@ var (
ErrAddressNotSet = errors.New("apiserver addresses not yet set")
ErrNotMember = errNotMember()
ErrMemberListFailed = errMemberListFailed()
invalidKeyChars = regexp.MustCompile(`[^-._a-zA-Z0-9]`)
)
type NodeControllerGetter func() controllerv1.NodeController
@ -110,8 +107,8 @@ type ETCD struct {
name string
address string
cron *cron.Cron
s3 *S3
cancel context.CancelFunc
s3 *s3.Controller
snapshotMu *sync.Mutex
}
@ -128,16 +125,16 @@ type Members struct {
Members []*etcdserverpb.Member `json:"members"`
}
type MembershipError struct {
Self string
Members []string
type membershipError struct {
self string
members []string
}
func (e *MembershipError) Error() string {
return fmt.Sprintf("this server is a not a member of the etcd cluster. Found %v, expect: %s", e.Members, e.Self)
func (e *membershipError) Error() string {
return fmt.Sprintf("this server is a not a member of the etcd cluster. Found %v, expect: %s", e.members, e.self)
}
func (e *MembershipError) Is(target error) bool {
func (e *membershipError) Is(target error) bool {
switch target {
case ErrNotMember:
return true
@ -145,17 +142,17 @@ func (e *MembershipError) Is(target error) bool {
return false
}
func errNotMember() error { return &MembershipError{} }
func errNotMember() error { return &membershipError{} }
type MemberListError struct {
Err error
type memberListError struct {
err error
}
func (e *MemberListError) Error() string {
return fmt.Sprintf("failed to get MemberList from server: %v", e.Err)
func (e *memberListError) Error() string {
return fmt.Sprintf("failed to get MemberList from server: %v", e.err)
}
func (e *MemberListError) Is(target error) bool {
func (e *memberListError) Is(target error) bool {
switch target {
case ErrMemberListFailed:
return true
@ -163,7 +160,7 @@ func (e *MemberListError) Is(target error) bool {
return false
}
func errMemberListFailed() error { return &MemberListError{} }
func errMemberListFailed() error { return &memberListError{} }
// NewETCD creates a new value of type
// ETCD with initialized cron and snapshot mutex values.
@ -256,7 +253,7 @@ func (e *ETCD) Test(ctx context.Context) error {
memberNameUrls = append(memberNameUrls, member.Name+"="+member.PeerURLs[0])
}
}
return &MembershipError{Members: memberNameUrls, Self: e.name + "=" + e.peerURL()}
return &membershipError{members: memberNameUrls, self: e.name + "=" + e.peerURL()}
}
// dbDir returns the path to dataDir/db/etcd
@ -391,14 +388,25 @@ func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error {
// If asked to restore from a snapshot, do so
if e.config.ClusterResetRestorePath != "" {
if e.config.EtcdS3 {
if e.config.EtcdS3 != nil {
logrus.Infof("Retrieving etcd snapshot %s from S3", e.config.ClusterResetRestorePath)
if err := e.initS3IfNil(ctx); err != nil {
return err
s3client, err := e.getS3Client(ctx)
if err != nil {
if errors.Is(err, s3.ErrNoConfigSecret) {
return errors.New("cannot use S3 config secret when restoring snapshot; configuration must be set in CLI or config file")
} else {
return errors.Wrap(err, "failed to initialize S3 client")
}
}
if err := e.s3.Download(ctx); err != nil {
return err
dir, err := snapshotDir(e.config, true)
if err != nil {
return errors.Wrap(err, "failed to get the snapshot dir")
}
path, err := s3client.Download(ctx, e.config.ClusterResetRestorePath, dir)
if err != nil {
return errors.Wrap(err, "failed to download snapshot from S3")
}
e.config.ClusterResetRestorePath = path
logrus.Infof("S3 download complete for %s", e.config.ClusterResetRestorePath)
}
@ -442,6 +450,7 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
}
go e.manageLearners(ctx)
go e.getS3Client(ctx)
if isInitialized {
// check etcd dir permission
@ -1416,7 +1425,7 @@ func ClientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info, selfIP
// get the full list from the server we're joining
resp, err := clientAccessInfo.Get("/db/info")
if err != nil {
return nil, memberList, &MemberListError{Err: err}
return nil, memberList, &memberListError{err: err}
}
if err := json.Unmarshal(resp, &memberList); err != nil {
return nil, memberList, err
@ -1463,13 +1472,13 @@ func (e *ETCD) Restore(ctx context.Context) error {
}
var restorePath string
if strings.HasSuffix(e.config.ClusterResetRestorePath, compressedExtension) {
snapshotDir, err := snapshotDir(e.config, true)
if strings.HasSuffix(e.config.ClusterResetRestorePath, snapshot.CompressedExtension) {
dir, err := snapshotDir(e.config, true)
if err != nil {
return errors.Wrap(err, "failed to get the snapshot dir")
}
decompressSnapshot, err := e.decompressSnapshot(snapshotDir, e.config.ClusterResetRestorePath)
decompressSnapshot, err := e.decompressSnapshot(dir, e.config.ClusterResetRestorePath)
if err != nil {
return err
}
@ -1485,13 +1494,7 @@ func (e *ETCD) Restore(ctx context.Context) error {
}
logrus.Infof("Pre-restore etcd database moved to %s", oldDataDir)
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
return err
}
return snapshot.NewV3(lg).Restore(snapshot.RestoreConfig{
return snapshotv3.NewV3(e.client.GetLogger()).Restore(snapshotv3.RestoreConfig{
SnapshotPath: restorePath,
Name: e.name,
OutputDataDir: dbDir(e.config),

View File

@ -11,8 +11,10 @@ import (
"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/etcd/s3"
testutil "github.com/k3s-io/k3s/tests"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver"
utilnet "k8s.io/apimachinery/pkg/util/net"
@ -47,10 +49,12 @@ func generateTestConfig() *config.Control {
EtcdSnapshotName: "etcd-snapshot",
EtcdSnapshotCron: "0 */12 * * *",
EtcdSnapshotRetention: 5,
EtcdS3Endpoint: "s3.amazonaws.com",
EtcdS3Region: "us-east-1",
SANs: []string{"127.0.0.1", mustGetAddress()},
CriticalControlArgs: criticalControlArgs,
EtcdS3: &config.EtcdS3{
Endpoint: "s3.amazonaws.com",
Region: "us-east-1",
},
SANs: []string{"127.0.0.1", mustGetAddress()},
CriticalControlArgs: criticalControlArgs,
}
}
@ -112,6 +116,10 @@ func Test_UnitETCD_IsInitialized(t *testing.T) {
want: false,
},
}
// enable logging
logrus.SetLevel(logrus.DebugLevel)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := NewETCD()
@ -227,7 +235,7 @@ func Test_UnitETCD_Start(t *testing.T) {
name string
address string
cron *cron.Cron
s3 *S3
s3 *s3.Controller
}
type args struct {
clientAccessInfo *clientaccess.Info

View File

@ -1,494 +0,0 @@
package etcd
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"encoding/pem"
"fmt"
"io/ioutil"
"net/http"
"net/textproto"
"os"
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)
var (
clusterIDKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-cluster-id")
tokenHashKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-token-hash")
nodeNameKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-node-name")
)
// S3 maintains state for S3 functionality.
type S3 struct {
config *config.Control
client *minio.Client
clusterID string
tokenHash string
nodeName string
}
// newS3 creates a new value of type s3 pointer with a
// copy of the config.Control pointer and initializes
// a new Minio client.
func NewS3(ctx context.Context, config *config.Control) (*S3, error) {
if config.EtcdS3BucketName == "" {
return nil, errors.New("s3 bucket name was not set")
}
tr := http.DefaultTransport
switch {
case config.EtcdS3EndpointCA != "":
trCA, err := setTransportCA(tr, config.EtcdS3EndpointCA, config.EtcdS3SkipSSLVerify)
if err != nil {
return nil, err
}
tr = trCA
case config.EtcdS3 && config.EtcdS3SkipSSLVerify:
tr.(*http.Transport).TLSClientConfig = &tls.Config{
InsecureSkipVerify: config.EtcdS3SkipSSLVerify,
}
}
var creds *credentials.Credentials
if len(config.EtcdS3AccessKey) == 0 && len(config.EtcdS3SecretKey) == 0 {
creds = credentials.NewIAM("") // for running on ec2 instance
} else {
creds = credentials.NewStaticV4(config.EtcdS3AccessKey, config.EtcdS3SecretKey, "")
}
opt := minio.Options{
Creds: creds,
Secure: !config.EtcdS3Insecure,
Region: config.EtcdS3Region,
Transport: tr,
BucketLookup: bucketLookupType(config.EtcdS3Endpoint),
}
c, err := minio.New(config.EtcdS3Endpoint, &opt)
if err != nil {
return nil, err
}
logrus.Infof("Checking if S3 bucket %s exists", config.EtcdS3BucketName)
ctx, cancel := context.WithTimeout(ctx, config.EtcdS3Timeout)
defer cancel()
exists, err := c.BucketExists(ctx, config.EtcdS3BucketName)
if err != nil {
return nil, errors.Wrapf(err, "failed to test for existence of bucket %s", config.EtcdS3BucketName)
}
if !exists {
return nil, fmt.Errorf("bucket %s does not exist", config.EtcdS3BucketName)
}
logrus.Infof("S3 bucket %s exists", config.EtcdS3BucketName)
s3 := &S3{
config: config,
client: c,
nodeName: os.Getenv("NODE_NAME"),
}
if config.ClusterReset {
logrus.Debug("Skip setting S3 snapshot cluster ID and token during cluster-reset")
} else {
if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
if config.Runtime.Core == nil {
return false, nil
}
// cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ
ns, err := config.Runtime.Core.Core().V1().Namespace().Get(metav1.NamespaceSystem, metav1.GetOptions{})
if err != nil {
return false, errors.Wrap(err, "failed to set S3 snapshot cluster ID")
}
s3.clusterID = string(ns.UID)
tokenHash, err := util.GetTokenHash(config)
if err != nil {
return false, errors.Wrap(err, "failed to set S3 snapshot server token hash")
}
s3.tokenHash = tokenHash
return true, nil
}); err != nil {
return nil, err
}
}
return s3, nil
}
// upload uploads the given snapshot to the configured S3
// compatible backend.
func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.ConfigMap, now time.Time) (*snapshotFile, error) {
basename := filepath.Base(snapshot)
metadata := filepath.Join(filepath.Dir(snapshot), "..", metadataDir, basename)
snapshotKey := path.Join(s.config.EtcdS3Folder, basename)
metadataKey := path.Join(s.config.EtcdS3Folder, metadataDir, basename)
sf := &snapshotFile{
Name: basename,
Location: fmt.Sprintf("s3://%s/%s", s.config.EtcdS3BucketName, snapshotKey),
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: now,
},
S3: &s3Config{
Endpoint: s.config.EtcdS3Endpoint,
EndpointCA: s.config.EtcdS3EndpointCA,
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
Bucket: s.config.EtcdS3BucketName,
Region: s.config.EtcdS3Region,
Folder: s.config.EtcdS3Folder,
Insecure: s.config.EtcdS3Insecure,
},
Compressed: strings.HasSuffix(snapshot, compressedExtension),
metadataSource: extraMetadata,
nodeSource: s.nodeName,
}
logrus.Infof("Uploading snapshot to s3://%s/%s", s.config.EtcdS3BucketName, snapshotKey)
uploadInfo, err := s.uploadSnapshot(ctx, snapshotKey, snapshot)
if err != nil {
sf.Status = failedSnapshotStatus
sf.Message = base64.StdEncoding.EncodeToString([]byte(err.Error()))
} else {
sf.Status = successfulSnapshotStatus
sf.Size = uploadInfo.Size
sf.tokenHash = s.tokenHash
}
if _, err := s.uploadSnapshotMetadata(ctx, metadataKey, metadata); err != nil {
logrus.Warnf("Failed to upload snapshot metadata to S3: %v", err)
} else {
logrus.Infof("Uploaded snapshot metadata s3://%s/%s", s.config.EtcdS3BucketName, metadataKey)
}
return sf, err
}
// uploadSnapshot uploads the snapshot file to S3 using the minio API.
func (s *S3) uploadSnapshot(ctx context.Context, key, path string) (info minio.UploadInfo, err error) {
opts := minio.PutObjectOptions{
NumThreads: 2,
UserMetadata: map[string]string{
clusterIDKey: s.clusterID,
nodeNameKey: s.nodeName,
tokenHashKey: s.tokenHash,
},
}
if strings.HasSuffix(key, compressedExtension) {
opts.ContentType = "application/zip"
} else {
opts.ContentType = "application/octet-stream"
}
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()
return s.client.FPutObject(ctx, s.config.EtcdS3BucketName, key, path, opts)
}
// uploadSnapshotMetadata marshals and uploads the snapshot metadata to S3 using the minio API.
// The upload is silently skipped if no extra metadata is provided.
func (s *S3) uploadSnapshotMetadata(ctx context.Context, key, path string) (info minio.UploadInfo, err error) {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return minio.UploadInfo{}, nil
}
return minio.UploadInfo{}, err
}
opts := minio.PutObjectOptions{
NumThreads: 2,
ContentType: "application/json",
UserMetadata: map[string]string{
clusterIDKey: s.clusterID,
nodeNameKey: s.nodeName,
},
}
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()
return s.client.FPutObject(ctx, s.config.EtcdS3BucketName, key, path, opts)
}
// Download downloads the given snapshot from the configured S3
// compatible backend.
func (s *S3) Download(ctx context.Context) error {
snapshotKey := path.Join(s.config.EtcdS3Folder, s.config.ClusterResetRestorePath)
metadataKey := path.Join(s.config.EtcdS3Folder, metadataDir, s.config.ClusterResetRestorePath)
snapshotDir, err := snapshotDir(s.config, true)
if err != nil {
return errors.Wrap(err, "failed to get the snapshot dir")
}
snapshotFile := filepath.Join(snapshotDir, s.config.ClusterResetRestorePath)
metadataFile := filepath.Join(snapshotDir, "..", metadataDir, s.config.ClusterResetRestorePath)
if err := s.downloadSnapshot(ctx, snapshotKey, snapshotFile); err != nil {
return err
}
if err := s.downloadSnapshotMetadata(ctx, metadataKey, metadataFile); err != nil {
return err
}
s.config.ClusterResetRestorePath = snapshotFile
return nil
}
// downloadSnapshot downloads the snapshot file from S3 using the minio API.
func (s *S3) downloadSnapshot(ctx context.Context, key, file string) error {
logrus.Debugf("Downloading snapshot from s3://%s/%s", s.config.EtcdS3BucketName, key)
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()
defer os.Chmod(file, 0600)
return s.client.FGetObject(ctx, s.config.EtcdS3BucketName, key, file, minio.GetObjectOptions{})
}
// downloadSnapshotMetadata downloads the snapshot metadata file from S3 using the minio API.
// No error is returned if the metadata file does not exist, as it is optional.
func (s *S3) downloadSnapshotMetadata(ctx context.Context, key, file string) error {
logrus.Debugf("Downloading snapshot metadata from s3://%s/%s", s.config.EtcdS3BucketName, key)
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()
defer os.Chmod(file, 0600)
err := s.client.FGetObject(ctx, s.config.EtcdS3BucketName, key, file, minio.GetObjectOptions{})
if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound {
return nil
}
return err
}
// snapshotPrefix returns the prefix used in the
// naming of the snapshots.
func (s *S3) snapshotPrefix() string {
return path.Join(s.config.EtcdS3Folder, s.config.EtcdSnapshotName)
}
// snapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node.
// Returns a list of pruned snapshot names.
func (s *S3) snapshotRetention(ctx context.Context) ([]string, error) {
if s.config.EtcdSnapshotRetention < 1 {
return nil, nil
}
logrus.Infof("Applying snapshot retention=%d to snapshots stored in s3://%s/%s", s.config.EtcdSnapshotRetention, s.config.EtcdS3BucketName, s.snapshotPrefix())
var snapshotFiles []minio.ObjectInfo
toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()
opts := minio.ListObjectsOptions{
Prefix: s.snapshotPrefix(),
Recursive: true,
}
for info := range s.client.ListObjects(toCtx, s.config.EtcdS3BucketName, opts) {
if info.Err != nil {
return nil, info.Err
}
// skip metadata
if path.Base(path.Dir(info.Key)) == metadataDir {
continue
}
snapshotFiles = append(snapshotFiles, info)
}
if len(snapshotFiles) <= s.config.EtcdSnapshotRetention {
return nil, nil
}
// sort newest-first so we can prune entries past the retention count
sort.Slice(snapshotFiles, func(i, j int) bool {
return snapshotFiles[j].LastModified.Before(snapshotFiles[i].LastModified)
})
deleted := []string{}
for _, df := range snapshotFiles[s.config.EtcdSnapshotRetention:] {
logrus.Infof("Removing S3 snapshot: s3://%s/%s", s.config.EtcdS3BucketName, df.Key)
key := path.Base(df.Key)
if err := s.deleteSnapshot(ctx, key); err != nil {
return deleted, err
}
deleted = append(deleted, key)
}
return deleted, nil
}
func (s *S3) deleteSnapshot(ctx context.Context, key string) error {
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()
key = path.Join(s.config.EtcdS3Folder, key)
err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, key, minio.RemoveObjectOptions{})
if err == nil || isNotExist(err) {
metadataKey := path.Join(path.Dir(key), metadataDir, path.Base(key))
if merr := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, metadataKey, minio.RemoveObjectOptions{}); merr != nil && !isNotExist(merr) {
err = merr
}
}
return err
}
// listSnapshots provides a list of currently stored
// snapshots in S3 along with their relevant
// metadata.
func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) {
snapshots := map[string]snapshotFile{}
metadatas := []string{}
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()
opts := minio.ListObjectsOptions{
Prefix: s.config.EtcdS3Folder,
Recursive: true,
}
objects := s.client.ListObjects(ctx, s.config.EtcdS3BucketName, opts)
for obj := range objects {
if obj.Err != nil {
return nil, obj.Err
}
if obj.Size == 0 {
continue
}
if o, err := s.client.StatObject(ctx, s.config.EtcdS3BucketName, obj.Key, minio.StatObjectOptions{}); err != nil {
logrus.Warnf("Failed to get object metadata: %v", err)
} else {
obj = o
}
filename := path.Base(obj.Key)
if path.Base(path.Dir(obj.Key)) == metadataDir {
metadatas = append(metadatas, obj.Key)
continue
}
basename, compressed := strings.CutSuffix(filename, compressedExtension)
ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
if err != nil {
ts = obj.LastModified.Unix()
}
sf := snapshotFile{
Name: filename,
Location: fmt.Sprintf("s3://%s/%s", s.config.EtcdS3BucketName, obj.Key),
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: time.Unix(ts, 0),
},
Size: obj.Size,
S3: &s3Config{
Endpoint: s.config.EtcdS3Endpoint,
EndpointCA: s.config.EtcdS3EndpointCA,
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
Bucket: s.config.EtcdS3BucketName,
Region: s.config.EtcdS3Region,
Folder: s.config.EtcdS3Folder,
Insecure: s.config.EtcdS3Insecure,
},
Status: successfulSnapshotStatus,
Compressed: compressed,
nodeSource: obj.UserMetadata[nodeNameKey],
tokenHash: obj.UserMetadata[tokenHashKey],
}
sfKey := generateSnapshotConfigMapKey(sf)
snapshots[sfKey] = sf
}
for _, metadataKey := range metadatas {
filename := path.Base(metadataKey)
sfKey := generateSnapshotConfigMapKey(snapshotFile{Name: filename, NodeName: "s3"})
if sf, ok := snapshots[sfKey]; ok {
logrus.Debugf("Loading snapshot metadata from s3://%s/%s", s.config.EtcdS3BucketName, metadataKey)
if obj, err := s.client.GetObject(ctx, s.config.EtcdS3BucketName, metadataKey, minio.GetObjectOptions{}); err != nil {
if isNotExist(err) {
logrus.Debugf("Failed to get snapshot metadata: %v", err)
} else {
logrus.Warnf("Failed to get snapshot metadata for %s: %v", filename, err)
}
} else {
if m, err := ioutil.ReadAll(obj); err != nil {
if isNotExist(err) {
logrus.Debugf("Failed to read snapshot metadata: %v", err)
} else {
logrus.Warnf("Failed to read snapshot metadata for %s: %v", filename, err)
}
} else {
sf.Metadata = base64.StdEncoding.EncodeToString(m)
snapshots[sfKey] = sf
}
}
}
}
return snapshots, nil
}
func readS3EndpointCA(endpointCA string) ([]byte, error) {
ca, err := base64.StdEncoding.DecodeString(endpointCA)
if err != nil {
return os.ReadFile(endpointCA)
}
return ca, nil
}
func setTransportCA(tr http.RoundTripper, endpointCA string, insecureSkipVerify bool) (http.RoundTripper, error) {
ca, err := readS3EndpointCA(endpointCA)
if err != nil {
return tr, err
}
if !isValidCertificate(ca) {
return tr, errors.New("endpoint-ca is not a valid x509 certificate")
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(ca)
tr.(*http.Transport).TLSClientConfig = &tls.Config{
RootCAs: certPool,
InsecureSkipVerify: insecureSkipVerify,
}
return tr, nil
}
// isValidCertificate checks to see if the given
// byte slice is a valid x509 certificate.
func isValidCertificate(c []byte) bool {
p, _ := pem.Decode(c)
if p == nil {
return false
}
if _, err := x509.ParseCertificates(p.Bytes); err != nil {
return false
}
return true
}
func bucketLookupType(endpoint string) minio.BucketLookupType {
if strings.Contains(endpoint, "aliyun") { // backwards compt with RKE1
return minio.BucketLookupDNS
}
return minio.BucketLookupAuto
}

View File

@ -0,0 +1,119 @@
package s3
import (
"encoding/base64"
"fmt"
"strconv"
"strings"
"time"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var ErrNoConfigSecret = errNoConfigSecret()
type secretError struct {
err error
}
func (e *secretError) Error() string {
return fmt.Sprintf("failed to get etcd S3 config secret: %v", e.err)
}
func (e *secretError) Is(target error) bool {
switch target {
case ErrNoConfigSecret:
return true
}
return false
}
func errNoConfigSecret() error { return &secretError{} }
func (c *Controller) getConfigFromSecret(secretName string) (*config.EtcdS3, error) {
if c.core == nil {
return nil, &secretError{err: util.ErrCoreNotReady}
}
secret, err := c.core.V1().Secret().Get(metav1.NamespaceSystem, secretName, metav1.GetOptions{})
if err != nil {
return nil, &secretError{err: err}
}
etcdS3 := &config.EtcdS3{
AccessKey: string(secret.Data["etcd-s3-access-key"]),
Bucket: string(secret.Data["etcd-s3-bucket"]),
Endpoint: defaultEtcdS3.Endpoint,
Folder: string(secret.Data["etcd-s3-folder"]),
Proxy: string(secret.Data["etcd-s3-proxy"]),
Region: defaultEtcdS3.Region,
SecretKey: string(secret.Data["etcd-s3-secret-key"]),
Timeout: *defaultEtcdS3.Timeout.DeepCopy(),
}
// Set endpoint from secret if set
if v, ok := secret.Data["etcd-s3-endpoint"]; ok {
etcdS3.Endpoint = string(v)
}
// Set region from secret if set
if v, ok := secret.Data["etcd-s3-region"]; ok {
etcdS3.Region = string(v)
}
// Set timeout from secret if set
if v, ok := secret.Data["etcd-s3-timeout"]; ok {
if duration, err := time.ParseDuration(string(v)); err != nil {
logrus.Warnf("Failed to parse etcd-s3-timeout value from S3 config secret %s: %v", secretName, err)
} else {
etcdS3.Timeout.Duration = duration
}
}
// configure ssl verification, if value can be parsed
if v, ok := secret.Data["etcd-s3-skip-ssl-verify"]; ok {
if b, err := strconv.ParseBool(string(v)); err != nil {
logrus.Warnf("Failed to parse etcd-s3-skip-ssl-verify value from S3 config secret %s: %v", secretName, err)
} else {
etcdS3.SkipSSLVerify = b
}
}
// configure insecure http, if value can be parsed
if v, ok := secret.Data["etcd-s3-insecure"]; ok {
if b, err := strconv.ParseBool(string(v)); err != nil {
logrus.Warnf("Failed to parse etcd-s3-insecure value from S3 config secret %s: %v", secretName, err)
} else {
etcdS3.Insecure = b
}
}
// encode CA bundles from value, and keys in configmap if one is named
caBundles := []string{}
// Add inline CA bundle if set
if len(secret.Data["etcd-s3-endpoint-ca"]) > 0 {
caBundles = append(caBundles, base64.StdEncoding.EncodeToString(secret.Data["etcd-s3-endpoint-ca"]))
}
// Add CA bundles from named configmap if set
if caConfigMapName := string(secret.Data["etcd-s3-endpoint-ca-name"]); caConfigMapName != "" {
configMap, err := c.core.V1().ConfigMap().Get(metav1.NamespaceSystem, caConfigMapName, metav1.GetOptions{})
if err != nil {
logrus.Warnf("Failed to get ConfigMap %s for etcd-s3-endpoint-ca-name value from S3 config secret %s: %v", caConfigMapName, secretName, err)
} else {
for _, v := range configMap.Data {
caBundles = append(caBundles, base64.StdEncoding.EncodeToString([]byte(v)))
}
for _, v := range configMap.BinaryData {
caBundles = append(caBundles, base64.StdEncoding.EncodeToString(v))
}
}
}
// Concatenate all requested CA bundle strings into config var
etcdS3.EndpointCA = strings.Join(caBundles, " ")
return etcdS3, nil
}

567
pkg/etcd/s3/s3.go Normal file
View File

@ -0,0 +1,567 @@
package s3
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"fmt"
"io/ioutil"
"net/http"
"net/textproto"
"net/url"
"os"
"path"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/etcd/snapshot"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/pkg/errors"
"github.com/rancher/wrangler/pkg/generated/controllers/core"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/lru"
)
var (
clusterIDKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-cluster-id")
tokenHashKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-token-hash")
nodeNameKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-node-name")
)
var defaultEtcdS3 = &config.EtcdS3{
Endpoint: "s3.amazonaws.com",
Region: "us-east-1",
Timeout: metav1.Duration{
Duration: 5 * time.Minute,
},
}
var (
controller *Controller
cErr error
once sync.Once
)
// Controller maintains state for S3 functionality,
// and can be used to get clients for interacting with
// an S3 service, given specific client configuration.
type Controller struct {
clusterID string
tokenHash string
nodeName string
core core.Interface
clientCache *lru.Cache
}
// Client holds state for a given configuration - a preconfigured minio client,
// and reference to the config it was created for.
type Client struct {
mc *minio.Client
etcdS3 *config.EtcdS3
controller *Controller
}
// Start initializes the cache and sets the cluster id and token hash,
// returning a reference to the the initialized controller. Initialization is
// locked by a sync.Once to prevent races, and multiple calls to start will
// return the same controller or error.
func Start(ctx context.Context, config *config.Control) (*Controller, error) {
once.Do(func() {
c := &Controller{
clientCache: lru.New(5),
nodeName: os.Getenv("NODE_NAME"),
}
if config.ClusterReset {
logrus.Debug("Skip setting S3 snapshot cluster ID and server token hash during cluster-reset")
controller = c
} else {
logrus.Debug("Getting S3 snapshot cluster ID and server token hash")
if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
if config.Runtime.Core == nil {
return false, nil
}
c.core = config.Runtime.Core.Core()
// cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ
ns, err := c.core.V1().Namespace().Get(metav1.NamespaceSystem, metav1.GetOptions{})
if err != nil {
return false, errors.Wrap(err, "failed to set S3 snapshot cluster ID")
}
c.clusterID = string(ns.UID)
tokenHash, err := util.GetTokenHash(config)
if err != nil {
return false, errors.Wrap(err, "failed to set S3 snapshot server token hash")
}
c.tokenHash = tokenHash
return true, nil
}); err != nil {
cErr = err
} else {
controller = c
}
}
})
return controller, cErr
}
func (c *Controller) GetClient(ctx context.Context, etcdS3 *config.EtcdS3) (*Client, error) {
if etcdS3 == nil {
return nil, errors.New("nil s3 configuration")
}
// update ConfigSecret in defaults so that comparisons between current and default config
// ignore ConfigSecret when deciding if CLI configuration is present.
defaultEtcdS3.ConfigSecret = etcdS3.ConfigSecret
// If config is default, try to load config from secret, and fail if it cannot be retrieved or if the secret name is not set.
// If config is not default, and secret name is set, warn that the secret is being ignored
isDefault := reflect.DeepEqual(defaultEtcdS3, etcdS3)
if etcdS3.ConfigSecret != "" {
if isDefault {
e, err := c.getConfigFromSecret(etcdS3.ConfigSecret)
if err != nil {
return nil, errors.Wrapf(err, "failed to get config from etcd-s3-config-secret %q", etcdS3.ConfigSecret)
}
logrus.Infof("Using etcd s3 configuration from etcd-s3-config-secret %q", etcdS3.ConfigSecret)
etcdS3 = e
} else {
logrus.Warnf("Ignoring s3 configuration from etcd-s3-config-secret %q due to existing configuration from CLI or config file", etcdS3.ConfigSecret)
}
} else if isDefault {
return nil, errors.New("s3 configuration was not set")
}
// used just for logging
scheme := "https://"
if etcdS3.Insecure {
scheme = "http://"
}
// Try to get an existing client from cache. The entire EtcdS3 struct
// (including the key id and secret) is used as the cache key, but we only
// print the endpoint and bucket name to avoid leaking creds into the logs.
if client, ok := c.clientCache.Get(*etcdS3); ok {
logrus.Infof("Reusing cached S3 client for endpoint=%q bucket=%q folder=%q", scheme+etcdS3.Endpoint, etcdS3.Bucket, etcdS3.Folder)
return client.(*Client), nil
}
logrus.Infof("Attempting to create new S3 client for endpoint=%q bucket=%q folder=%q", scheme+etcdS3.Endpoint, etcdS3.Bucket, etcdS3.Folder)
if etcdS3.Bucket == "" {
return nil, errors.New("s3 bucket name was not set")
}
tr := http.DefaultTransport.(*http.Transport).Clone()
// You can either disable SSL verification or use a custom CA bundle,
// it doesn't make sense to do both - if verification is disabled,
// the CA is not checked!
if etcdS3.SkipSSLVerify {
tr.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
} else if etcdS3.EndpointCA != "" {
tlsConfig, err := loadEndpointCAs(etcdS3.EndpointCA)
if err != nil {
return nil, err
}
tr.TLSClientConfig = tlsConfig
}
// Set a fixed proxy URL, if requested by the user. This replaces the default,
// which calls ProxyFromEnvironment to read proxy settings from the environment.
if etcdS3.Proxy != "" {
var u *url.URL
var err error
// proxy address of literal "none" disables all use of a proxy by S3
if etcdS3.Proxy != "none" {
u, err = url.Parse(etcdS3.Proxy)
if err != nil {
return nil, errors.Wrap(err, "failed to parse etcd-s3-proxy value as URL")
}
if u.Scheme == "" || u.Host == "" {
return nil, fmt.Errorf("proxy URL must include scheme and host")
}
}
tr.Proxy = http.ProxyURL(u)
}
var creds *credentials.Credentials
if len(etcdS3.AccessKey) == 0 && len(etcdS3.SecretKey) == 0 {
creds = credentials.NewIAM("") // for running on ec2 instance
if _, err := creds.Get(); err != nil {
return nil, errors.Wrap(err, "failed to get IAM credentials")
}
} else {
creds = credentials.NewStaticV4(etcdS3.AccessKey, etcdS3.SecretKey, "")
}
opt := minio.Options{
Creds: creds,
Secure: !etcdS3.Insecure,
Region: etcdS3.Region,
Transport: tr,
BucketLookup: bucketLookupType(etcdS3.Endpoint),
}
mc, err := minio.New(etcdS3.Endpoint, &opt)
if err != nil {
return nil, err
}
logrus.Infof("Checking if S3 bucket %s exists", etcdS3.Bucket)
ctx, cancel := context.WithTimeout(ctx, etcdS3.Timeout.Duration)
defer cancel()
exists, err := mc.BucketExists(ctx, etcdS3.Bucket)
if err != nil {
return nil, errors.Wrapf(err, "failed to test for existence of bucket %s", etcdS3.Bucket)
}
if !exists {
return nil, fmt.Errorf("bucket %s does not exist", etcdS3.Bucket)
}
logrus.Infof("S3 bucket %s exists", etcdS3.Bucket)
client := &Client{
mc: mc,
etcdS3: etcdS3,
controller: c,
}
logrus.Infof("Adding S3 client to cache")
c.clientCache.Add(*etcdS3, client)
return client, nil
}
// upload uploads the given snapshot to the configured S3
// compatible backend.
func (c *Client) Upload(ctx context.Context, snapshotPath string, extraMetadata *v1.ConfigMap, now time.Time) (*snapshot.File, error) {
basename := filepath.Base(snapshotPath)
metadata := filepath.Join(filepath.Dir(snapshotPath), "..", snapshot.MetadataDir, basename)
snapshotKey := path.Join(c.etcdS3.Folder, basename)
metadataKey := path.Join(c.etcdS3.Folder, snapshot.MetadataDir, basename)
sf := &snapshot.File{
Name: basename,
Location: fmt.Sprintf("s3://%s/%s", c.etcdS3.Bucket, snapshotKey),
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: now,
},
S3: &snapshot.S3Config{EtcdS3: *c.etcdS3},
Compressed: strings.HasSuffix(snapshotPath, snapshot.CompressedExtension),
MetadataSource: extraMetadata,
NodeSource: c.controller.nodeName,
}
logrus.Infof("Uploading snapshot to s3://%s/%s", c.etcdS3.Bucket, snapshotKey)
uploadInfo, err := c.uploadSnapshot(ctx, snapshotKey, snapshotPath)
if err != nil {
sf.Status = snapshot.FailedStatus
sf.Message = base64.StdEncoding.EncodeToString([]byte(err.Error()))
} else {
sf.Status = snapshot.SuccessfulStatus
sf.Size = uploadInfo.Size
sf.TokenHash = c.controller.tokenHash
}
if uploadInfo, err := c.uploadSnapshotMetadata(ctx, metadataKey, metadata); err != nil {
logrus.Warnf("Failed to upload snapshot metadata to S3: %v", err)
} else if uploadInfo.Size != 0 {
logrus.Infof("Uploaded snapshot metadata s3://%s/%s", c.etcdS3.Bucket, metadataKey)
}
return sf, err
}
// uploadSnapshot uploads the snapshot file to S3 using the minio API.
func (c *Client) uploadSnapshot(ctx context.Context, key, path string) (info minio.UploadInfo, err error) {
opts := minio.PutObjectOptions{
NumThreads: 2,
UserMetadata: map[string]string{
clusterIDKey: c.controller.clusterID,
nodeNameKey: c.controller.nodeName,
tokenHashKey: c.controller.tokenHash,
},
}
if strings.HasSuffix(key, snapshot.CompressedExtension) {
opts.ContentType = "application/zip"
} else {
opts.ContentType = "application/octet-stream"
}
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
defer cancel()
return c.mc.FPutObject(ctx, c.etcdS3.Bucket, key, path, opts)
}
// uploadSnapshotMetadata marshals and uploads the snapshot metadata to S3 using the minio API.
// The upload is silently skipped if no extra metadata is provided.
func (c *Client) uploadSnapshotMetadata(ctx context.Context, key, path string) (info minio.UploadInfo, err error) {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return minio.UploadInfo{}, nil
}
return minio.UploadInfo{}, err
}
opts := minio.PutObjectOptions{
NumThreads: 2,
ContentType: "application/json",
UserMetadata: map[string]string{
clusterIDKey: c.controller.clusterID,
nodeNameKey: c.controller.nodeName,
tokenHashKey: c.controller.tokenHash,
},
}
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
defer cancel()
return c.mc.FPutObject(ctx, c.etcdS3.Bucket, key, path, opts)
}
// Download downloads the given snapshot from the configured S3
// compatible backend. If the file is successfully downloaded, it returns
// the path the file was downloaded to.
func (c *Client) Download(ctx context.Context, snapshotName, snapshotDir string) (string, error) {
snapshotKey := path.Join(c.etcdS3.Folder, snapshotName)
metadataKey := path.Join(c.etcdS3.Folder, snapshot.MetadataDir, snapshotName)
snapshotFile := filepath.Join(snapshotDir, snapshotName)
metadataFile := filepath.Join(snapshotDir, "..", snapshot.MetadataDir, snapshotName)
if err := c.downloadSnapshot(ctx, snapshotKey, snapshotFile); err != nil {
return "", err
}
if err := c.downloadSnapshotMetadata(ctx, metadataKey, metadataFile); err != nil {
return "", err
}
return snapshotFile, nil
}
// downloadSnapshot downloads the snapshot file from S3 using the minio API.
func (c *Client) downloadSnapshot(ctx context.Context, key, file string) error {
logrus.Debugf("Downloading snapshot from s3://%s/%s", c.etcdS3.Bucket, key)
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
defer cancel()
defer os.Chmod(file, 0600)
return c.mc.FGetObject(ctx, c.etcdS3.Bucket, key, file, minio.GetObjectOptions{})
}
// downloadSnapshotMetadata downloads the snapshot metadata file from S3 using the minio API.
// No error is returned if the metadata file does not exist, as it is optional.
func (c *Client) downloadSnapshotMetadata(ctx context.Context, key, file string) error {
logrus.Debugf("Downloading snapshot metadata from s3://%s/%s", c.etcdS3.Bucket, key)
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
defer cancel()
defer os.Chmod(file, 0600)
err := c.mc.FGetObject(ctx, c.etcdS3.Bucket, key, file, minio.GetObjectOptions{})
if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound {
return nil
}
return err
}
// SnapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node.
// Returns a list of pruned snapshot names.
func (c *Client) SnapshotRetention(ctx context.Context, retention int, prefix string) ([]string, error) {
if retention < 1 {
return nil, nil
}
prefix = path.Join(c.etcdS3.Folder, prefix)
logrus.Infof("Applying snapshot retention=%d to snapshots stored in s3://%s/%s", retention, c.etcdS3.Bucket, prefix)
var snapshotFiles []minio.ObjectInfo
toCtx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
defer cancel()
opts := minio.ListObjectsOptions{
Prefix: prefix,
Recursive: true,
}
for info := range c.mc.ListObjects(toCtx, c.etcdS3.Bucket, opts) {
if info.Err != nil {
return nil, info.Err
}
// skip metadata
if path.Base(path.Dir(info.Key)) == snapshot.MetadataDir {
continue
}
snapshotFiles = append(snapshotFiles, info)
}
if len(snapshotFiles) <= retention {
return nil, nil
}
// sort newest-first so we can prune entries past the retention count
sort.Slice(snapshotFiles, func(i, j int) bool {
return snapshotFiles[j].LastModified.Before(snapshotFiles[i].LastModified)
})
deleted := []string{}
for _, df := range snapshotFiles[retention:] {
logrus.Infof("Removing S3 snapshot: s3://%s/%s", c.etcdS3.Bucket, df.Key)
key := path.Base(df.Key)
if err := c.DeleteSnapshot(ctx, key); err != nil {
return deleted, err
}
deleted = append(deleted, key)
}
return deleted, nil
}
// DeleteSnapshot deletes the selected snapshot (and its metadata) from S3
func (c *Client) DeleteSnapshot(ctx context.Context, key string) error {
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
defer cancel()
key = path.Join(c.etcdS3.Folder, key)
err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, key, minio.RemoveObjectOptions{})
if err == nil || snapshot.IsNotExist(err) {
metadataKey := path.Join(path.Dir(key), snapshot.MetadataDir, path.Base(key))
if merr := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, metadataKey, minio.RemoveObjectOptions{}); merr != nil && !snapshot.IsNotExist(merr) {
err = merr
}
}
return err
}
// listSnapshots provides a list of currently stored
// snapshots in S3 along with their relevant
// metadata.
func (c *Client) ListSnapshots(ctx context.Context) (map[string]snapshot.File, error) {
snapshots := map[string]snapshot.File{}
metadatas := []string{}
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
defer cancel()
opts := minio.ListObjectsOptions{
Prefix: c.etcdS3.Folder,
Recursive: true,
}
objects := c.mc.ListObjects(ctx, c.etcdS3.Bucket, opts)
for obj := range objects {
if obj.Err != nil {
return nil, obj.Err
}
if obj.Size == 0 {
continue
}
if o, err := c.mc.StatObject(ctx, c.etcdS3.Bucket, obj.Key, minio.StatObjectOptions{}); err != nil {
logrus.Warnf("Failed to get object metadata: %v", err)
} else {
obj = o
}
filename := path.Base(obj.Key)
if path.Base(path.Dir(obj.Key)) == snapshot.MetadataDir {
metadatas = append(metadatas, obj.Key)
continue
}
basename, compressed := strings.CutSuffix(filename, snapshot.CompressedExtension)
ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
if err != nil {
ts = obj.LastModified.Unix()
}
sf := snapshot.File{
Name: filename,
Location: fmt.Sprintf("s3://%s/%s", c.etcdS3.Bucket, obj.Key),
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: time.Unix(ts, 0),
},
Size: obj.Size,
S3: &snapshot.S3Config{EtcdS3: *c.etcdS3},
Status: snapshot.SuccessfulStatus,
Compressed: compressed,
NodeSource: obj.UserMetadata[nodeNameKey],
TokenHash: obj.UserMetadata[tokenHashKey],
}
sfKey := sf.GenerateConfigMapKey()
snapshots[sfKey] = sf
}
for _, metadataKey := range metadatas {
filename := path.Base(metadataKey)
dsf := &snapshot.File{Name: filename, NodeName: "s3"}
sfKey := dsf.GenerateConfigMapKey()
if sf, ok := snapshots[sfKey]; ok {
logrus.Debugf("Loading snapshot metadata from s3://%s/%s", c.etcdS3.Bucket, metadataKey)
if obj, err := c.mc.GetObject(ctx, c.etcdS3.Bucket, metadataKey, minio.GetObjectOptions{}); err != nil {
if snapshot.IsNotExist(err) {
logrus.Debugf("Failed to get snapshot metadata: %v", err)
} else {
logrus.Warnf("Failed to get snapshot metadata for %s: %v", filename, err)
}
} else {
if m, err := ioutil.ReadAll(obj); err != nil {
if snapshot.IsNotExist(err) {
logrus.Debugf("Failed to read snapshot metadata: %v", err)
} else {
logrus.Warnf("Failed to read snapshot metadata for %s: %v", filename, err)
}
} else {
sf.Metadata = base64.StdEncoding.EncodeToString(m)
snapshots[sfKey] = sf
}
}
}
}
return snapshots, nil
}
func loadEndpointCAs(etcdS3EndpointCA string) (*tls.Config, error) {
var loaded bool
certPool := x509.NewCertPool()
for _, ca := range strings.Split(etcdS3EndpointCA, " ") {
// Try to decode the value as base64-encoded data - yes, a base64 string that itself
// contains multiline, ascii-armored, base64-encoded certificate data - as would be produced
// by `base64 --wrap=0 /path/to/cert.pem`. If this fails, assume the value is the path to a
// file on disk, and try to read that. This is backwards compatible with RKE1.
caData, err := base64.StdEncoding.DecodeString(ca)
if err != nil {
caData, err = os.ReadFile(ca)
}
if err != nil {
return nil, err
}
if certPool.AppendCertsFromPEM(caData) {
loaded = true
}
}
if loaded {
return &tls.Config{RootCAs: certPool}, nil
}
return nil, errors.New("no certificates loaded from etcd-s3-endpoint-ca")
}
func bucketLookupType(endpoint string) minio.BucketLookupType {
if strings.Contains(endpoint, "aliyun") { // backwards compatible with RKE1
return minio.BucketLookupDNS
}
return minio.BucketLookupAuto
}

1743
pkg/etcd/s3/s3_test.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -3,14 +3,11 @@ package etcd
import (
"archive/zip"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"os"
"path/filepath"
"runtime"
@ -22,38 +19,31 @@ import (
k3s "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
"github.com/k3s-io/k3s/pkg/cluster/managed"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/etcd/s3"
"github.com/k3s-io/k3s/pkg/etcd/snapshot"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/minio/minio-go/v7"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/etcdutl/v3/snapshot"
snapshotv3 "go.etcd.io/etcd/etcdutl/v3/snapshot"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/utils/ptr"
)
const (
compressedExtension = ".zip"
metadataDir = ".metadata"
errorTTL = 24 * time.Hour
errorTTL = 24 * time.Hour
)
var (
snapshotExtraMetadataConfigMapName = version.Program + "-etcd-snapshot-extra-metadata"
labelStorageNode = "etcd." + version.Program + ".cattle.io/snapshot-storage-node"
annotationLocalReconciled = "etcd." + version.Program + ".cattle.io/local-snapshots-timestamp"
annotationS3Reconciled = "etcd." + version.Program + ".cattle.io/s3-snapshots-timestamp"
annotationTokenHash = "etcd." + version.Program + ".cattle.io/snapshot-token-hash"
annotationLocalReconciled = "etcd." + version.Program + ".cattle.io/local-snapshots-timestamp"
annotationS3Reconciled = "etcd." + version.Program + ".cattle.io/s3-snapshots-timestamp"
// snapshotDataBackoff will retry at increasing steps for up to ~30 seconds.
// If the ConfigMap update fails, the list won't be reconciled again until next time
@ -109,7 +99,7 @@ func snapshotDir(config *config.Control, create bool) (string, error) {
func (e *ETCD) compressSnapshot(snapshotDir, snapshotName, snapshotPath string, now time.Time) (string, error) {
logrus.Info("Compressing etcd snapshot file: " + snapshotName)
zippedSnapshotName := snapshotName + compressedExtension
zippedSnapshotName := snapshotName + snapshot.CompressedExtension
zipPath := filepath.Join(snapshotDir, zippedSnapshotName)
zf, err := os.Create(zipPath)
@ -168,7 +158,7 @@ func (e *ETCD) decompressSnapshot(snapshotDir, snapshotFile string) (string, err
var decompressed *os.File
for _, sf := range r.File {
decompressed, err = os.OpenFile(strings.Replace(sf.Name, compressedExtension, "", -1), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, sf.Mode())
decompressed, err = os.OpenFile(strings.Replace(sf.Name, snapshot.CompressedExtension, "", -1), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, sf.Mode())
if err != nil {
return "", err
}
@ -203,13 +193,13 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
// make sure the core.Factory is initialized before attempting to add snapshot metadata
var extraMetadata *v1.ConfigMap
if e.config.Runtime.Core == nil {
logrus.Debugf("Cannot retrieve extra metadata from %s ConfigMap: runtime core not ready", snapshotExtraMetadataConfigMapName)
logrus.Debugf("Cannot retrieve extra metadata from %s ConfigMap: runtime core not ready", snapshot.ExtraMetadataConfigMapName)
} else {
logrus.Debugf("Attempting to retrieve extra metadata from %s ConfigMap", snapshotExtraMetadataConfigMapName)
if snapshotExtraMetadataConfigMap, err := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotExtraMetadataConfigMapName, metav1.GetOptions{}); err != nil {
logrus.Debugf("Error encountered attempting to retrieve extra metadata from %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err)
logrus.Debugf("Attempting to retrieve extra metadata from %s ConfigMap", snapshot.ExtraMetadataConfigMapName)
if snapshotExtraMetadataConfigMap, err := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshot.ExtraMetadataConfigMapName, metav1.GetOptions{}); err != nil {
logrus.Debugf("Error encountered attempting to retrieve extra metadata from %s ConfigMap, error: %v", snapshot.ExtraMetadataConfigMapName, err)
} else {
logrus.Debugf("Setting extra metadata from %s ConfigMap", snapshotExtraMetadataConfigMapName)
logrus.Debugf("Setting extra metadata from %s ConfigMap", snapshot.ExtraMetadataConfigMapName)
extraMetadata = snapshotExtraMetadataConfigMap
}
}
@ -246,20 +236,20 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
snapshotPath := filepath.Join(snapshotDir, snapshotName)
logrus.Infof("Saving etcd snapshot to %s", snapshotPath)
var sf *snapshotFile
var sf *snapshot.File
if err := snapshot.NewV3(e.client.GetLogger()).Save(ctx, *cfg, snapshotPath); err != nil {
sf = &snapshotFile{
if err := snapshotv3.NewV3(e.client.GetLogger()).Save(ctx, *cfg, snapshotPath); err != nil {
sf = &snapshot.File{
Name: snapshotName,
Location: "",
NodeName: nodeName,
CreatedAt: &metav1.Time{
Time: now,
},
Status: failedSnapshotStatus,
Status: snapshot.FailedStatus,
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
Size: 0,
metadataSource: extraMetadata,
MetadataSource: extraMetadata,
}
logrus.Errorf("Failed to take etcd snapshot: %v", err)
if err := e.addSnapshotData(*sf); err != nil {
@ -290,18 +280,18 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
return nil, errors.Wrap(err, "unable to retrieve snapshot information from local snapshot")
}
sf = &snapshotFile{
sf = &snapshot.File{
Name: f.Name(),
Location: "file://" + snapshotPath,
NodeName: nodeName,
CreatedAt: &metav1.Time{
Time: now,
},
Status: successfulSnapshotStatus,
Status: snapshot.SuccessfulStatus,
Size: f.Size(),
Compressed: e.config.EtcdSnapshotCompress,
metadataSource: extraMetadata,
tokenHash: tokenHash,
MetadataSource: extraMetadata,
TokenHash: tokenHash,
}
res.Created = append(res.Created, sf.Name)
@ -323,34 +313,29 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
}
res.Deleted = append(res.Deleted, deleted...)
if e.config.EtcdS3 {
if err := e.initS3IfNil(ctx); err != nil {
if e.config.EtcdS3 != nil {
if s3client, err := e.getS3Client(ctx); err != nil {
logrus.Warnf("Unable to initialize S3 client: %v", err)
sf = &snapshotFile{
Name: f.Name(),
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: now,
},
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
Size: 0,
Status: failedSnapshotStatus,
S3: &s3Config{
Endpoint: e.config.EtcdS3Endpoint,
EndpointCA: e.config.EtcdS3EndpointCA,
SkipSSLVerify: e.config.EtcdS3SkipSSLVerify,
Bucket: e.config.EtcdS3BucketName,
Region: e.config.EtcdS3Region,
Folder: e.config.EtcdS3Folder,
Insecure: e.config.EtcdS3Insecure,
},
metadataSource: extraMetadata,
if !errors.Is(err, s3.ErrNoConfigSecret) {
err = errors.Wrap(err, "failed to initialize S3 client")
sf = &snapshot.File{
Name: f.Name(),
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: now,
},
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
Size: 0,
Status: snapshot.FailedStatus,
S3: &snapshot.S3Config{EtcdS3: *e.config.EtcdS3},
MetadataSource: extraMetadata,
}
}
} else {
logrus.Infof("Saving etcd snapshot %s to S3", snapshotName)
// upload will return a snapshotFile even on error - if there was an
// upload will return a snapshot.File even on error - if there was an
// error, it will be reflected in the status and message.
sf, err = e.s3.upload(ctx, snapshotPath, extraMetadata, now)
sf, err = s3client.Upload(ctx, snapshotPath, extraMetadata, now)
if err != nil {
logrus.Errorf("Error received during snapshot upload to S3: %s", err)
} else {
@ -360,7 +345,7 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
// Attempt to apply retention even if the upload failed; failure may be due to bucket
// being full or some other condition that retention policy would resolve.
// Snapshot retention may prune some files before returning an error. Failing to prune is not fatal.
deleted, err := e.s3.snapshotRetention(ctx)
deleted, err := s3client.SnapshotRetention(ctx, e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName)
res.Deleted = append(res.Deleted, deleted...)
if err != nil {
logrus.Warnf("Failed to apply s3 snapshot retention policy: %v", err)
@ -378,52 +363,12 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
return res, e.ReconcileSnapshotData(ctx)
}
type s3Config struct {
Endpoint string `json:"endpoint,omitempty"`
EndpointCA string `json:"endpointCA,omitempty"`
SkipSSLVerify bool `json:"skipSSLVerify,omitempty"`
Bucket string `json:"bucket,omitempty"`
Region string `json:"region,omitempty"`
Folder string `json:"folder,omitempty"`
Insecure bool `json:"insecure,omitempty"`
}
type snapshotStatus string
const (
successfulSnapshotStatus snapshotStatus = "successful"
failedSnapshotStatus snapshotStatus = "failed"
)
// snapshotFile represents a single snapshot and it's
// metadata.
type snapshotFile struct {
Name string `json:"name"`
// Location contains the full path of the snapshot. For
// local paths, the location will be prefixed with "file://".
Location string `json:"location,omitempty"`
Metadata string `json:"metadata,omitempty"`
Message string `json:"message,omitempty"`
NodeName string `json:"nodeName,omitempty"`
CreatedAt *metav1.Time `json:"createdAt,omitempty"`
Size int64 `json:"size,omitempty"`
Status snapshotStatus `json:"status,omitempty"`
S3 *s3Config `json:"s3Config,omitempty"`
Compressed bool `json:"compressed"`
// these fields are used for the internal representation of the snapshot
// to populate other fields before serialization to the legacy configmap.
metadataSource *v1.ConfigMap `json:"-"`
nodeSource string `json:"-"`
tokenHash string `json:"-"`
}
// listLocalSnapshots provides a list of the currently stored
// snapshots on disk along with their relevant
// metadata.
func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) {
func (e *ETCD) listLocalSnapshots() (map[string]snapshot.File, error) {
nodeName := os.Getenv("NODE_NAME")
snapshots := make(map[string]snapshotFile)
snapshots := make(map[string]snapshot.File)
snapshotDir, err := snapshotDir(e.config, true)
if err != nil {
return snapshots, errors.Wrap(err, "failed to get etcd-snapshot-dir")
@ -434,7 +379,7 @@ func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) {
return err
}
basename, compressed := strings.CutSuffix(file.Name(), compressedExtension)
basename, compressed := strings.CutSuffix(file.Name(), snapshot.CompressedExtension)
ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
if err != nil {
ts = file.ModTime().Unix()
@ -443,13 +388,13 @@ func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) {
// try to read metadata from disk; don't warn if it is missing as it will not exist
// for snapshot files from old releases or if there was no metadata provided.
var metadata string
metadataFile := filepath.Join(filepath.Dir(path), "..", metadataDir, file.Name())
metadataFile := filepath.Join(filepath.Dir(path), "..", snapshot.MetadataDir, file.Name())
if m, err := os.ReadFile(metadataFile); err == nil {
logrus.Debugf("Loading snapshot metadata from %s", metadataFile)
metadata = base64.StdEncoding.EncodeToString(m)
}
sf := snapshotFile{
sf := snapshot.File{
Name: file.Name(),
Location: "file://" + filepath.Join(snapshotDir, file.Name()),
NodeName: nodeName,
@ -458,10 +403,10 @@ func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) {
Time: time.Unix(ts, 0),
},
Size: file.Size(),
Status: successfulSnapshotStatus,
Status: snapshot.SuccessfulStatus,
Compressed: compressed,
}
sfKey := generateSnapshotConfigMapKey(sf)
sfKey := sf.GenerateConfigMapKey()
snapshots[sfKey] = sf
return nil
}); err != nil {
@ -471,18 +416,21 @@ func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) {
return snapshots, nil
}
// initS3IfNil initializes the S3 client
// if it hasn't yet been initialized.
func (e *ETCD) initS3IfNil(ctx context.Context) error {
if e.config.EtcdS3 && e.s3 == nil {
s3, err := NewS3(ctx, e.config)
// getS3Client initializes the S3 controller if it hasn't yet been initialized.
// If S3 is or can be initialized successfully, and valid S3 configuration is
// present, a client for the current S3 configuration is returned.
// The context passed here is only used to validate the configuration,
// it does not need to continue to remain uncancelled after the call returns.
func (e *ETCD) getS3Client(ctx context.Context) (*s3.Client, error) {
if e.s3 == nil {
s3, err := s3.Start(ctx, e.config)
if err != nil {
return err
return nil, err
}
e.s3 = s3
}
return nil
return e.s3.GetClient(ctx, e.config.EtcdS3)
}
// PruneSnapshots deleted old snapshots in excess of the configured retention count.
@ -502,11 +450,11 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) (*managed.SnapshotResult, err
logrus.Errorf("Error applying snapshot retention policy: %v", err)
}
if e.config.EtcdS3 {
if err := e.initS3IfNil(ctx); err != nil {
if e.config.EtcdS3 != nil {
if s3client, err := e.getS3Client(ctx); err != nil {
logrus.Warnf("Unable to initialize S3 client: %v", err)
} else {
deleted, err := e.s3.snapshotRetention(ctx)
deleted, err := s3client.SnapshotRetention(ctx, e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName)
if err != nil {
logrus.Errorf("Error applying S3 snapshot retention policy: %v", err)
}
@ -524,19 +472,23 @@ func (e *ETCD) ListSnapshots(ctx context.Context) (*k3s.ETCDSnapshotFileList, er
snapshotFiles := &k3s.ETCDSnapshotFileList{
TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "List"},
}
if e.config.EtcdS3 {
if err := e.initS3IfNil(ctx); err != nil {
if e.config.EtcdS3 != nil {
if s3client, err := e.getS3Client(ctx); err != nil {
logrus.Warnf("Unable to initialize S3 client: %v", err)
return nil, err
}
sfs, err := e.s3.listSnapshots(ctx)
if err != nil {
return nil, err
}
for k, sf := range sfs {
esf := k3s.NewETCDSnapshotFile("", k, k3s.ETCDSnapshotFile{})
sf.toETCDSnapshotFile(esf)
snapshotFiles.Items = append(snapshotFiles.Items, *esf)
if !errors.Is(err, s3.ErrNoConfigSecret) {
return nil, errors.Wrap(err, "failed to initialize S3 client")
}
} else {
sfs, err := s3client.ListSnapshots(ctx)
if err != nil {
return nil, err
}
for k, sf := range sfs {
esf := k3s.NewETCDSnapshotFile("", k, k3s.ETCDSnapshotFile{})
sf.ToETCDSnapshotFile(esf)
snapshotFiles.Items = append(snapshotFiles.Items, *esf)
}
}
}
@ -546,7 +498,7 @@ func (e *ETCD) ListSnapshots(ctx context.Context) (*k3s.ETCDSnapshotFileList, er
}
for k, sf := range sfs {
esf := k3s.NewETCDSnapshotFile("", k, k3s.ETCDSnapshotFile{})
sf.toETCDSnapshotFile(esf)
sf.ToETCDSnapshotFile(esf)
snapshotFiles.Items = append(snapshotFiles.Items, *esf)
}
@ -561,17 +513,22 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) (*manage
if err != nil {
return nil, errors.Wrap(err, "failed to get etcd-snapshot-dir")
}
if e.config.EtcdS3 {
if err := e.initS3IfNil(ctx); err != nil {
var s3client *s3.Client
if e.config.EtcdS3 != nil {
s3client, err = e.getS3Client(ctx)
if err != nil {
logrus.Warnf("Unable to initialize S3 client: %v", err)
return nil, err
if !errors.Is(err, s3.ErrNoConfigSecret) {
return nil, errors.Wrap(err, "failed to initialize S3 client")
}
}
}
res := &managed.SnapshotResult{}
for _, s := range snapshots {
if err := e.deleteSnapshot(filepath.Join(snapshotDir, s)); err != nil {
if isNotExist(err) {
if snapshot.IsNotExist(err) {
logrus.Infof("Snapshot %s not found locally", s)
} else {
logrus.Errorf("Failed to delete local snapshot %s: %v", s, err)
@ -581,9 +538,9 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) (*manage
logrus.Infof("Snapshot %s deleted locally", s)
}
if e.config.EtcdS3 {
if err := e.s3.deleteSnapshot(ctx, s); err != nil {
if isNotExist(err) {
if s3client != nil {
if err := s3client.DeleteSnapshot(ctx, s); err != nil {
if snapshot.IsNotExist(err) {
logrus.Infof("Snapshot %s not found in S3", s)
} else {
logrus.Errorf("Failed to delete S3 snapshot %s: %v", s, err)
@ -599,13 +556,13 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) (*manage
}
func (e *ETCD) deleteSnapshot(snapshotPath string) error {
dir := filepath.Join(filepath.Dir(snapshotPath), "..", metadataDir)
dir := filepath.Join(filepath.Dir(snapshotPath), "..", snapshot.MetadataDir)
filename := filepath.Base(snapshotPath)
metadataPath := filepath.Join(dir, filename)
err := os.Remove(snapshotPath)
if err == nil || os.IsNotExist(err) {
if merr := os.Remove(metadataPath); err != nil && !isNotExist(err) {
if merr := os.Remove(metadataPath); err != nil && !snapshot.IsNotExist(err) {
err = merr
}
}
@ -613,27 +570,16 @@ func (e *ETCD) deleteSnapshot(snapshotPath string) error {
return err
}
func marshalSnapshotFile(sf snapshotFile) ([]byte, error) {
if sf.metadataSource != nil {
if m, err := json.Marshal(sf.metadataSource.Data); err != nil {
logrus.Debugf("Error attempting to marshal extra metadata contained in %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err)
} else {
sf.Metadata = base64.StdEncoding.EncodeToString(m)
}
}
return json.Marshal(sf)
}
// addSnapshotData syncs an internal snapshotFile representation to an ETCDSnapshotFile resource
// of the same name. Resources will be created or updated as necessary.
func (e *ETCD) addSnapshotData(sf snapshotFile) error {
func (e *ETCD) addSnapshotData(sf snapshot.File) error {
// make sure the K3s factory is initialized.
for e.config.Runtime.K3s == nil {
runtime.Gosched()
}
snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
esfName := generateSnapshotName(sf)
esfName := sf.GenerateName()
var esf *k3s.ETCDSnapshotFile
return retry.OnError(snapshotDataBackoff, func(err error) bool {
@ -654,7 +600,7 @@ func (e *ETCD) addSnapshotData(sf snapshotFile) error {
// mutate object
existing := esf.DeepCopyObject()
sf.toETCDSnapshotFile(esf)
sf.ToETCDSnapshotFile(esf)
// create or update as necessary
if esf.CreationTimestamp.IsZero() {
@ -671,48 +617,10 @@ func (e *ETCD) addSnapshotData(sf snapshotFile) error {
})
}
// generateSnapshotConfigMapKey generates a derived name for the snapshot that is safe for use
// as a configmap key.
func generateSnapshotConfigMapKey(sf snapshotFile) string {
name := invalidKeyChars.ReplaceAllString(sf.Name, "_")
if sf.NodeName == "s3" {
return "s3-" + name
}
return "local-" + name
}
// generateSnapshotName generates a derived name for the snapshot that is safe for use
// as a resource name.
func generateSnapshotName(sf snapshotFile) string {
name := strings.ToLower(sf.Name)
nodename := sf.nodeSource
if nodename == "" {
nodename = sf.NodeName
}
// Include a digest of the hostname and location to ensure unique resource
// names. Snapshots should already include the hostname, but this ensures we
// don't accidentally hide records if a snapshot with the same name somehow
// exists on multiple nodes.
digest := sha256.Sum256([]byte(nodename + sf.Location))
// If the lowercase filename isn't usable as a resource name, and short enough that we can include a prefix and suffix,
// generate a safe name derived from the hostname and timestamp.
if errs := validation.IsDNS1123Subdomain(name); len(errs) != 0 || len(name)+13 > validation.DNS1123SubdomainMaxLength {
nodename, _, _ := strings.Cut(nodename, ".")
name = fmt.Sprintf("etcd-snapshot-%s-%d", nodename, sf.CreatedAt.Unix())
if sf.Compressed {
name += compressedExtension
}
}
if sf.NodeName == "s3" {
return "s3-" + name + "-" + hex.EncodeToString(digest[0:])[0:6]
}
return "local-" + name + "-" + hex.EncodeToString(digest[0:])[0:6]
}
// generateETCDSnapshotFileConfigMapKey generates a key that the corresponding
// snapshotFile would be stored under in the legacy configmap
func generateETCDSnapshotFileConfigMapKey(esf k3s.ETCDSnapshotFile) string {
name := invalidKeyChars.ReplaceAllString(esf.Spec.SnapshotName, "_")
name := snapshot.InvalidKeyChars.ReplaceAllString(esf.Spec.SnapshotName, "_")
if esf.Spec.S3 != nil {
return "s3-" + name
}
@ -757,19 +665,21 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
nodeNames := []string{os.Getenv("NODE_NAME")}
// Get snapshots from S3
if e.config.EtcdS3 {
if err := e.initS3IfNil(ctx); err != nil {
if e.config.EtcdS3 != nil {
if s3client, err := e.getS3Client(ctx); err != nil {
logrus.Warnf("Unable to initialize S3 client: %v", err)
return err
}
if s3Snapshots, err := e.s3.listSnapshots(ctx); err != nil {
logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err)
} else {
for k, v := range s3Snapshots {
snapshotFiles[k] = v
if !errors.Is(err, s3.ErrNoConfigSecret) {
return errors.Wrap(err, "failed to initialize S3 client")
}
} else {
if s3Snapshots, err := s3client.ListSnapshots(ctx); err != nil {
logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err)
} else {
for k, v := range s3Snapshots {
snapshotFiles[k] = v
}
nodeNames = append(nodeNames, "s3")
}
nodeNames = append(nodeNames, "s3")
}
}
@ -784,9 +694,9 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
for sfKey, sf := range snapshotFiles {
logrus.Debugf("Found snapshotFile for %s with key %s", sf.Name, sfKey)
// if the configmap has data for this snapshot, and local metadata is empty,
// deserialize the value from the configmap and attempt to load it.
if cmSnapshotValue := snapshotConfigMap.Data[sfKey]; cmSnapshotValue != "" && sf.Metadata == "" && sf.metadataSource == nil {
sfTemp := &snapshotFile{}
// deserialize the value from the configmap and attempt to load iM.
if cmSnapshotValue := snapshotConfigMap.Data[sfKey]; cmSnapshotValue != "" && sf.Metadata == "" && sf.MetadataSource == nil {
sfTemp := &snapshot.File{}
if err := json.Unmarshal([]byte(cmSnapshotValue), sfTemp); err != nil {
logrus.Warnf("Failed to unmarshal configmap data for snapshot %s: %v", sfKey, err)
continue
@ -799,7 +709,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
labelSelector := &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{{
Key: labelStorageNode,
Key: snapshot.LabelStorageNode,
Operator: metav1.LabelSelectorOpIn,
Values: nodeNames,
}},
@ -823,7 +733,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
for _, esf := range esfList.Items {
sfKey := generateETCDSnapshotFileConfigMapKey(esf)
logrus.Debugf("Found ETCDSnapshotFile for %s with key %s", esf.Spec.SnapshotName, sfKey)
if sf, ok := snapshotFiles[sfKey]; ok && generateSnapshotName(sf) == esf.Name {
if sf, ok := snapshotFiles[sfKey]; ok && sf.GenerateName() == esf.Name {
// exists in both and names match, don't need to sync
delete(snapshotFiles, sfKey)
} else {
@ -835,7 +745,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
}
}
if ok {
logrus.Debugf("Name of ETCDSnapshotFile for snapshotFile with key %s does not match: %s vs %s", sfKey, generateSnapshotName(sf), esf.Name)
logrus.Debugf("Name of ETCDSnapshotFile for snapshotFile with key %s does not match: %s vs %s", sfKey, sf.GenerateName(), esf.Name)
} else {
logrus.Debugf("Key %s not found in snapshotFile list", sfKey)
}
@ -904,7 +814,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
"path": "/metadata/annotations/" + strings.ReplaceAll(annotationLocalReconciled, "/", "~1"),
},
}
if e.config.EtcdS3 {
if e.config.EtcdS3 != nil {
patch = append(patch, map[string]string{
"op": "add",
"value": now,
@ -942,18 +852,18 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string)
logrus.Infof("Applying snapshot retention=%d to local snapshots with prefix %s in %s", retention, snapshotPrefix, snapshotDir)
var snapshotFiles []snapshotFile
var snapshotFiles []snapshot.File
if err := filepath.Walk(snapshotDir, func(path string, info os.FileInfo, err error) error {
if info.IsDir() || err != nil {
return err
}
if strings.HasPrefix(info.Name(), snapshotPrefix) {
basename, compressed := strings.CutSuffix(info.Name(), compressedExtension)
basename, compressed := strings.CutSuffix(info.Name(), snapshot.CompressedExtension)
ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
if err != nil {
ts = info.ModTime().Unix()
}
snapshotFiles = append(snapshotFiles, snapshotFile{Name: info.Name(), CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed})
snapshotFiles = append(snapshotFiles, snapshot.File{Name: info.Name(), CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed})
}
return nil
}); err != nil {
@ -971,7 +881,7 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string)
deleted := []string{}
for _, df := range snapshotFiles[retention:] {
snapshotPath := filepath.Join(snapshotDir, df.Name)
metadataPath := filepath.Join(snapshotDir, "..", metadataDir, df.Name)
metadataPath := filepath.Join(snapshotDir, "..", snapshot.MetadataDir, df.Name)
logrus.Infof("Removing local snapshot %s", snapshotPath)
if err := os.Remove(snapshotPath); err != nil {
return deleted, err
@ -985,13 +895,6 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string)
return deleted, nil
}
func isNotExist(err error) bool {
if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound || os.IsNotExist(err) {
return true
}
return false
}
// saveSnapshotMetadata writes extra metadata to disk.
// The upload is silently skipped if no extra metadata is provided.
func saveSnapshotMetadata(snapshotPath string, extraMetadata *v1.ConfigMap) error {
@ -999,7 +902,7 @@ func saveSnapshotMetadata(snapshotPath string, extraMetadata *v1.ConfigMap) erro
return nil
}
dir := filepath.Join(filepath.Dir(snapshotPath), "..", metadataDir)
dir := filepath.Join(filepath.Dir(snapshotPath), "..", snapshot.MetadataDir)
filename := filepath.Base(snapshotPath)
metadataPath := filepath.Join(dir, filename)
logrus.Infof("Saving snapshot metadata to %s", metadataPath)
@ -1012,135 +915,3 @@ func saveSnapshotMetadata(snapshotPath string, extraMetadata *v1.ConfigMap) erro
}
return os.WriteFile(metadataPath, m, 0700)
}
func (sf *snapshotFile) fromETCDSnapshotFile(esf *k3s.ETCDSnapshotFile) {
if esf == nil {
panic("cannot convert from nil ETCDSnapshotFile")
}
sf.Name = esf.Spec.SnapshotName
sf.Location = esf.Spec.Location
sf.CreatedAt = esf.Status.CreationTime
sf.nodeSource = esf.Spec.NodeName
sf.Compressed = strings.HasSuffix(esf.Spec.SnapshotName, compressedExtension)
if esf.Status.ReadyToUse != nil && *esf.Status.ReadyToUse {
sf.Status = successfulSnapshotStatus
} else {
sf.Status = failedSnapshotStatus
}
if esf.Status.Size != nil {
sf.Size = esf.Status.Size.Value()
}
if esf.Status.Error != nil {
if esf.Status.Error.Time != nil {
sf.CreatedAt = esf.Status.Error.Time
}
message := "etcd snapshot failed"
if esf.Status.Error.Message != nil {
message = *esf.Status.Error.Message
}
sf.Message = base64.StdEncoding.EncodeToString([]byte(message))
}
if len(esf.Spec.Metadata) > 0 {
if b, err := json.Marshal(esf.Spec.Metadata); err != nil {
logrus.Warnf("Failed to marshal metadata for %s: %v", esf.Name, err)
} else {
sf.Metadata = base64.StdEncoding.EncodeToString(b)
}
}
if tokenHash := esf.Annotations[annotationTokenHash]; tokenHash != "" {
sf.tokenHash = tokenHash
}
if esf.Spec.S3 == nil {
sf.NodeName = esf.Spec.NodeName
} else {
sf.NodeName = "s3"
sf.S3 = &s3Config{
Endpoint: esf.Spec.S3.Endpoint,
EndpointCA: esf.Spec.S3.EndpointCA,
SkipSSLVerify: esf.Spec.S3.SkipSSLVerify,
Bucket: esf.Spec.S3.Bucket,
Region: esf.Spec.S3.Region,
Folder: esf.Spec.S3.Prefix,
Insecure: esf.Spec.S3.Insecure,
}
}
}
func (sf *snapshotFile) toETCDSnapshotFile(esf *k3s.ETCDSnapshotFile) {
if esf == nil {
panic("cannot convert to nil ETCDSnapshotFile")
}
esf.Spec.SnapshotName = sf.Name
esf.Spec.Location = sf.Location
esf.Status.CreationTime = sf.CreatedAt
esf.Status.ReadyToUse = ptr.To(sf.Status == successfulSnapshotStatus)
esf.Status.Size = resource.NewQuantity(sf.Size, resource.DecimalSI)
if sf.nodeSource != "" {
esf.Spec.NodeName = sf.nodeSource
} else {
esf.Spec.NodeName = sf.NodeName
}
if sf.Message != "" {
var message string
b, err := base64.StdEncoding.DecodeString(sf.Message)
if err != nil {
logrus.Warnf("Failed to decode error message for %s: %v", sf.Name, err)
message = "etcd snapshot failed"
} else {
message = string(b)
}
esf.Status.Error = &k3s.ETCDSnapshotError{
Time: sf.CreatedAt,
Message: &message,
}
}
if sf.metadataSource != nil {
esf.Spec.Metadata = sf.metadataSource.Data
} else if sf.Metadata != "" {
metadata, err := base64.StdEncoding.DecodeString(sf.Metadata)
if err != nil {
logrus.Warnf("Failed to decode metadata for %s: %v", sf.Name, err)
} else {
if err := json.Unmarshal(metadata, &esf.Spec.Metadata); err != nil {
logrus.Warnf("Failed to unmarshal metadata for %s: %v", sf.Name, err)
}
}
}
if esf.ObjectMeta.Labels == nil {
esf.ObjectMeta.Labels = map[string]string{}
}
if esf.ObjectMeta.Annotations == nil {
esf.ObjectMeta.Annotations = map[string]string{}
}
if sf.tokenHash != "" {
esf.ObjectMeta.Annotations[annotationTokenHash] = sf.tokenHash
}
if sf.S3 == nil {
esf.ObjectMeta.Labels[labelStorageNode] = esf.Spec.NodeName
} else {
esf.ObjectMeta.Labels[labelStorageNode] = "s3"
esf.Spec.S3 = &k3s.ETCDSnapshotS3{
Endpoint: sf.S3.Endpoint,
EndpointCA: sf.S3.EndpointCA,
SkipSSLVerify: sf.S3.SkipSSLVerify,
Bucket: sf.S3.Bucket,
Region: sf.S3.Region,
Prefix: sf.S3.Folder,
Insecure: sf.S3.Insecure,
}
}
}

270
pkg/etcd/snapshot/types.go Normal file
View File

@ -0,0 +1,270 @@
package snapshot
import (
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"os"
"regexp"
"strings"
k3s "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/version"
"github.com/minio/minio-go/v7"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/utils/ptr"
)
type SnapshotStatus string
const (
SuccessfulStatus SnapshotStatus = "successful"
FailedStatus SnapshotStatus = "failed"
CompressedExtension = ".zip"
MetadataDir = ".metadata"
)
var (
InvalidKeyChars = regexp.MustCompile(`[^-._a-zA-Z0-9]`)
LabelStorageNode = "etcd." + version.Program + ".cattle.io/snapshot-storage-node"
AnnotationTokenHash = "etcd." + version.Program + ".cattle.io/snapshot-token-hash"
ExtraMetadataConfigMapName = version.Program + "-etcd-snapshot-extra-metadata"
)
type S3Config struct {
config.EtcdS3
// Mask these fields in the embedded struct to avoid serializing their values in the snapshotFile record
AccessKey string `json:"accessKey,omitempty"`
ConfigSecret string `json:"configSecret,omitempty"`
Proxy string `json:"proxy,omitempty"`
SecretKey string `json:"secretKey,omitempty"`
Timeout metav1.Duration `json:"timeout,omitempty"`
}
// File represents a single snapshot and it's
// metadata.
type File struct {
Name string `json:"name"`
// Location contains the full path of the snapshot. For
// local paths, the location will be prefixed with "file://".
Location string `json:"location,omitempty"`
Metadata string `json:"metadata,omitempty"`
Message string `json:"message,omitempty"`
NodeName string `json:"nodeName,omitempty"`
CreatedAt *metav1.Time `json:"createdAt,omitempty"`
Size int64 `json:"size,omitempty"`
Status SnapshotStatus `json:"status,omitempty"`
S3 *S3Config `json:"s3Config,omitempty"`
Compressed bool `json:"compressed"`
// these fields are used for the internal representation of the snapshot
// to populate other fields before serialization to the legacy configmap.
MetadataSource *v1.ConfigMap `json:"-"`
NodeSource string `json:"-"`
TokenHash string `json:"-"`
}
// GenerateConfigMapKey generates a derived name for the snapshot that is safe for use
// as a configmap key.
func (sf *File) GenerateConfigMapKey() string {
name := InvalidKeyChars.ReplaceAllString(sf.Name, "_")
if sf.NodeName == "s3" {
return "s3-" + name
}
return "local-" + name
}
// GenerateName generates a derived name for the snapshot that is safe for use
// as a resource name.
func (sf *File) GenerateName() string {
name := strings.ToLower(sf.Name)
nodename := sf.NodeSource
if nodename == "" {
nodename = sf.NodeName
}
// Include a digest of the hostname and location to ensure unique resource
// names. Snapshots should already include the hostname, but this ensures we
// don't accidentally hide records if a snapshot with the same name somehow
// exists on multiple nodes.
digest := sha256.Sum256([]byte(nodename + sf.Location))
// If the lowercase filename isn't usable as a resource name, and short enough that we can include a prefix and suffix,
// generate a safe name derived from the hostname and timestamp.
if errs := validation.IsDNS1123Subdomain(name); len(errs) != 0 || len(name)+13 > validation.DNS1123SubdomainMaxLength {
nodename, _, _ := strings.Cut(nodename, ".")
name = fmt.Sprintf("etcd-snapshot-%s-%d", nodename, sf.CreatedAt.Unix())
if sf.Compressed {
name += CompressedExtension
}
}
if sf.NodeName == "s3" {
return "s3-" + name + "-" + hex.EncodeToString(digest[0:])[0:6]
}
return "local-" + name + "-" + hex.EncodeToString(digest[0:])[0:6]
}
// FromETCDSnapshotFile translates fields to the File from the ETCDSnapshotFile
func (sf *File) FromETCDSnapshotFile(esf *k3s.ETCDSnapshotFile) {
if esf == nil {
panic("cannot convert from nil ETCDSnapshotFile")
}
sf.Name = esf.Spec.SnapshotName
sf.Location = esf.Spec.Location
sf.CreatedAt = esf.Status.CreationTime
sf.NodeSource = esf.Spec.NodeName
sf.Compressed = strings.HasSuffix(esf.Spec.SnapshotName, CompressedExtension)
if esf.Status.ReadyToUse != nil && *esf.Status.ReadyToUse {
sf.Status = SuccessfulStatus
} else {
sf.Status = FailedStatus
}
if esf.Status.Size != nil {
sf.Size = esf.Status.Size.Value()
}
if esf.Status.Error != nil {
if esf.Status.Error.Time != nil {
sf.CreatedAt = esf.Status.Error.Time
}
message := "etcd snapshot failed"
if esf.Status.Error.Message != nil {
message = *esf.Status.Error.Message
}
sf.Message = base64.StdEncoding.EncodeToString([]byte(message))
}
if len(esf.Spec.Metadata) > 0 {
if b, err := json.Marshal(esf.Spec.Metadata); err != nil {
logrus.Warnf("Failed to marshal metadata for %s: %v", esf.Name, err)
} else {
sf.Metadata = base64.StdEncoding.EncodeToString(b)
}
}
if tokenHash := esf.Annotations[AnnotationTokenHash]; tokenHash != "" {
sf.TokenHash = tokenHash
}
if esf.Spec.S3 == nil {
sf.NodeName = esf.Spec.NodeName
} else {
sf.NodeName = "s3"
sf.S3 = &S3Config{
EtcdS3: config.EtcdS3{
Endpoint: esf.Spec.S3.Endpoint,
EndpointCA: esf.Spec.S3.EndpointCA,
SkipSSLVerify: esf.Spec.S3.SkipSSLVerify,
Bucket: esf.Spec.S3.Bucket,
Region: esf.Spec.S3.Region,
Folder: esf.Spec.S3.Prefix,
Insecure: esf.Spec.S3.Insecure,
},
}
}
}
// ToETCDSnapshotFile translates fields from the File to the ETCDSnapshotFile
func (sf *File) ToETCDSnapshotFile(esf *k3s.ETCDSnapshotFile) {
if esf == nil {
panic("cannot convert to nil ETCDSnapshotFile")
}
esf.Spec.SnapshotName = sf.Name
esf.Spec.Location = sf.Location
esf.Status.CreationTime = sf.CreatedAt
esf.Status.ReadyToUse = ptr.To(sf.Status == SuccessfulStatus)
esf.Status.Size = resource.NewQuantity(sf.Size, resource.DecimalSI)
if sf.NodeSource != "" {
esf.Spec.NodeName = sf.NodeSource
} else {
esf.Spec.NodeName = sf.NodeName
}
if sf.Message != "" {
var message string
b, err := base64.StdEncoding.DecodeString(sf.Message)
if err != nil {
logrus.Warnf("Failed to decode error message for %s: %v", sf.Name, err)
message = "etcd snapshot failed"
} else {
message = string(b)
}
esf.Status.Error = &k3s.ETCDSnapshotError{
Time: sf.CreatedAt,
Message: &message,
}
}
if sf.MetadataSource != nil {
esf.Spec.Metadata = sf.MetadataSource.Data
} else if sf.Metadata != "" {
metadata, err := base64.StdEncoding.DecodeString(sf.Metadata)
if err != nil {
logrus.Warnf("Failed to decode metadata for %s: %v", sf.Name, err)
} else {
if err := json.Unmarshal(metadata, &esf.Spec.Metadata); err != nil {
logrus.Warnf("Failed to unmarshal metadata for %s: %v", sf.Name, err)
}
}
}
if esf.ObjectMeta.Labels == nil {
esf.ObjectMeta.Labels = map[string]string{}
}
if esf.ObjectMeta.Annotations == nil {
esf.ObjectMeta.Annotations = map[string]string{}
}
if sf.TokenHash != "" {
esf.ObjectMeta.Annotations[AnnotationTokenHash] = sf.TokenHash
}
if sf.S3 == nil {
esf.ObjectMeta.Labels[LabelStorageNode] = esf.Spec.NodeName
} else {
esf.ObjectMeta.Labels[LabelStorageNode] = "s3"
esf.Spec.S3 = &k3s.ETCDSnapshotS3{
Endpoint: sf.S3.Endpoint,
EndpointCA: sf.S3.EndpointCA,
SkipSSLVerify: sf.S3.SkipSSLVerify,
Bucket: sf.S3.Bucket,
Region: sf.S3.Region,
Prefix: sf.S3.Folder,
Insecure: sf.S3.Insecure,
}
}
}
// Marshal returns the JSON encoding of the snapshot File, with metadata inlined as base64.
func (sf *File) Marshal() ([]byte, error) {
if sf.MetadataSource != nil {
if m, err := json.Marshal(sf.MetadataSource.Data); err != nil {
logrus.Debugf("Error attempting to marshal extra metadata contained in %s ConfigMap, error: %v", ExtraMetadataConfigMapName, err)
} else {
sf.Metadata = base64.StdEncoding.EncodeToString(m)
}
}
return json.Marshal(sf)
}
// IsNotExist returns true if the error is from http.StatusNotFound or os.IsNotExist
func IsNotExist(err error) bool {
if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound || os.IsNotExist(err) {
return true
}
return false
}

View File

@ -9,6 +9,7 @@ import (
"time"
apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
"github.com/k3s-io/k3s/pkg/etcd/snapshot"
controllersv1 "github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io/v1"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
@ -81,10 +82,10 @@ func (e *etcdSnapshotHandler) sync(key string, esf *apisv1.ETCDSnapshotFile) (*a
return nil, nil
}
sf := snapshotFile{}
sf.fromETCDSnapshotFile(esf)
sfKey := generateSnapshotConfigMapKey(sf)
m, err := marshalSnapshotFile(sf)
sf := &snapshot.File{}
sf.FromETCDSnapshotFile(esf)
sfKey := sf.GenerateConfigMapKey()
m, err := sf.Marshal()
if err != nil {
return nil, errors.Wrap(err, "failed to marshal snapshot ConfigMap data")
}
@ -283,9 +284,9 @@ func (e *etcdSnapshotHandler) reconcile() error {
// Ensure keys for existing snapshots
for sfKey, esf := range snapshots {
sf := snapshotFile{}
sf.fromETCDSnapshotFile(esf)
m, err := marshalSnapshotFile(sf)
sf := &snapshot.File{}
sf.FromETCDSnapshotFile(esf)
m, err := sf.Marshal()
if err != nil {
logrus.Warnf("Failed to marshal snapshot ConfigMap data for %s", sfKey)
continue
@ -327,12 +328,12 @@ func pruneConfigMap(snapshotConfigMap *v1.ConfigMap, pruneCount int) error {
return errors.New("unable to reduce snapshot ConfigMap size by eliding old snapshots")
}
var snapshotFiles []snapshotFile
var snapshotFiles []snapshot.File
retention := len(snapshotConfigMap.Data) - pruneCount
for name := range snapshotConfigMap.Data {
basename, compressed := strings.CutSuffix(name, compressedExtension)
basename, compressed := strings.CutSuffix(name, snapshot.CompressedExtension)
ts, _ := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
snapshotFiles = append(snapshotFiles, snapshotFile{Name: name, CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed})
snapshotFiles = append(snapshotFiles, snapshot.File{Name: name, CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed})
}
// sort newest-first so we can prune entries past the retention count

View File

@ -11,8 +11,8 @@ import (
"github.com/k3s-io/k3s/pkg/cluster/managed"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type SnapshotOperation string
@ -24,21 +24,13 @@ const (
SnapshotOperationDelete SnapshotOperation = "delete"
)
type SnapshotRequestS3 struct {
s3Config
Timeout metav1.Duration `json:"timeout"`
AccessKey string `json:"accessKey"`
SecretKey string `json:"secretKey"`
}
type SnapshotRequest struct {
Operation SnapshotOperation `json:"operation"`
Name []string `json:"name,omitempty"`
Dir *string `json:"dir,omitempty"`
Compress *bool `json:"compress,omitempty"`
Retention *int `json:"retention,omitempty"`
S3 *SnapshotRequestS3 `json:"s3,omitempty"`
S3 *config.EtcdS3 `json:"s3,omitempty"`
ctx context.Context
}
@ -76,9 +68,12 @@ func (e *ETCD) snapshotHandler() http.Handler {
}
func (e *ETCD) handleList(rw http.ResponseWriter, req *http.Request) error {
if err := e.initS3IfNil(req.Context()); err != nil {
util.SendError(err, rw, req, http.StatusBadRequest)
return nil
if e.config.EtcdS3 != nil {
if _, err := e.getS3Client(req.Context()); err != nil {
err = errors.Wrap(err, "failed to initialize S3 client")
util.SendError(err, rw, req, http.StatusBadRequest)
return nil
}
}
sf, err := e.ListSnapshots(req.Context())
if sf == nil {
@ -90,9 +85,12 @@ func (e *ETCD) handleList(rw http.ResponseWriter, req *http.Request) error {
}
func (e *ETCD) handleSave(rw http.ResponseWriter, req *http.Request) error {
if err := e.initS3IfNil(req.Context()); err != nil {
util.SendError(err, rw, req, http.StatusBadRequest)
return nil
if e.config.EtcdS3 != nil {
if _, err := e.getS3Client(req.Context()); err != nil {
err = errors.Wrap(err, "failed to initialize S3 client")
util.SendError(err, rw, req, http.StatusBadRequest)
return nil
}
}
sr, err := e.Snapshot(req.Context())
if sr == nil {
@ -104,9 +102,12 @@ func (e *ETCD) handleSave(rw http.ResponseWriter, req *http.Request) error {
}
func (e *ETCD) handlePrune(rw http.ResponseWriter, req *http.Request) error {
if err := e.initS3IfNil(req.Context()); err != nil {
util.SendError(err, rw, req, http.StatusBadRequest)
return nil
if e.config.EtcdS3 != nil {
if _, err := e.getS3Client(req.Context()); err != nil {
err = errors.Wrap(err, "failed to initialize S3 client")
util.SendError(err, rw, req, http.StatusBadRequest)
return nil
}
}
sr, err := e.PruneSnapshots(req.Context())
if sr == nil {
@ -118,9 +119,12 @@ func (e *ETCD) handlePrune(rw http.ResponseWriter, req *http.Request) error {
}
func (e *ETCD) handleDelete(rw http.ResponseWriter, req *http.Request, snapshots []string) error {
if err := e.initS3IfNil(req.Context()); err != nil {
util.SendError(err, rw, req, http.StatusBadRequest)
return nil
if e.config.EtcdS3 != nil {
if _, err := e.getS3Client(req.Context()); err != nil {
err = errors.Wrap(err, "failed to initialize S3 client")
util.SendError(err, rw, req, http.StatusBadRequest)
return nil
}
}
sr, err := e.DeleteSnapshots(req.Context(), snapshots)
if sr == nil {
@ -149,7 +153,9 @@ func (e *ETCD) withRequest(sr *SnapshotRequest) *ETCD {
EtcdSnapshotCompress: e.config.EtcdSnapshotCompress,
EtcdSnapshotName: e.config.EtcdSnapshotName,
EtcdSnapshotRetention: e.config.EtcdSnapshotRetention,
EtcdS3: sr.S3,
},
s3: e.s3,
name: e.name,
address: e.address,
cron: e.cron,
@ -168,19 +174,6 @@ func (e *ETCD) withRequest(sr *SnapshotRequest) *ETCD {
if sr.Retention != nil {
re.config.EtcdSnapshotRetention = *sr.Retention
}
if sr.S3 != nil {
re.config.EtcdS3 = true
re.config.EtcdS3AccessKey = sr.S3.AccessKey
re.config.EtcdS3BucketName = sr.S3.Bucket
re.config.EtcdS3Endpoint = sr.S3.Endpoint
re.config.EtcdS3EndpointCA = sr.S3.EndpointCA
re.config.EtcdS3Folder = sr.S3.Folder
re.config.EtcdS3Insecure = sr.S3.Insecure
re.config.EtcdS3Region = sr.S3.Region
re.config.EtcdS3SecretKey = sr.S3.SecretKey
re.config.EtcdS3SkipSSLVerify = sr.S3.SkipSSLVerify
re.config.EtcdS3Timeout = sr.S3.Timeout.Duration
}
return re
}

View File

@ -46,13 +46,8 @@ def provision(vm, role, role_num, node_num)
cluster-init: true
etcd-snapshot-schedule-cron: '*/1 * * * *'
etcd-snapshot-retention: 2
etcd-s3-insecure: true
etcd-s3-bucket: test-bucket
etcd-s3-folder: test-folder
etcd-s3: true
etcd-s3-endpoint: localhost:9090
etcd-s3-skip-ssl-verify: true
etcd-s3-access-key: test
etcd-s3-config-secret: k3s-etcd-s3-config
YAML
k3s.env = %W[K3S_KUBECONFIG_MODE=0644 #{install_type}]
k3s.config_mode = '0644' # side-step https://github.com/k3s-io/k3s/issues/4321

View File

@ -87,7 +87,31 @@ var _ = Describe("Verify Create", Ordered, func() {
fmt.Println(res)
Expect(err).NotTo(HaveOccurred())
})
It("save s3 snapshot", func() {
It("save s3 snapshot using CLI", func() {
res, err := e2e.RunCmdOnNode("k3s etcd-snapshot save "+
"--etcd-s3-insecure=true "+
"--etcd-s3-bucket=test-bucket "+
"--etcd-s3-folder=test-folder "+
"--etcd-s3-endpoint=localhost:9090 "+
"--etcd-s3-skip-ssl-verify=true "+
"--etcd-s3-access-key=test ",
serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
Expect(res).To(ContainSubstring("Snapshot on-demand-server-0"))
})
It("creates s3 config secret", func() {
res, err := e2e.RunCmdOnNode("k3s kubectl create secret generic k3s-etcd-s3-config --namespace=kube-system "+
"--from-literal=etcd-s3-insecure=true "+
"--from-literal=etcd-s3-bucket=test-bucket "+
"--from-literal=etcd-s3-folder=test-folder "+
"--from-literal=etcd-s3-endpoint=localhost:9090 "+
"--from-literal=etcd-s3-skip-ssl-verify=true "+
"--from-literal=etcd-s3-access-key=test ",
serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
Expect(res).To(ContainSubstring("secret/k3s-etcd-s3-config created"))
})
It("save s3 snapshot using secret", func() {
res, err := e2e.RunCmdOnNode("k3s etcd-snapshot save", serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
Expect(res).To(ContainSubstring("Snapshot on-demand-server-0"))