mirror of https://github.com/k3s-io/k3s.git
Add etcd snapshot metrics
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 6199b79f4b
)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/11928/head
parent
adb495931b
commit
edc5203598
|
@ -11,6 +11,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/inetaf/tcpproxy"
|
||||
"github.com/k3s-io/k3s/pkg/util/metrics"
|
||||
"github.com/k3s-io/k3s/pkg/version"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
@ -99,13 +100,8 @@ func New(ctx context.Context, dataDir, serviceName, defaultServerURL string, lbS
|
|||
OnDialError: onDialError,
|
||||
DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
start := time.Now()
|
||||
status := "success"
|
||||
conn, err := lb.servers.dialContext(ctx, network, address)
|
||||
latency := time.Since(start)
|
||||
if err != nil {
|
||||
status = "error"
|
||||
}
|
||||
loadbalancerDials.WithLabelValues(serviceName, status).Observe(latency.Seconds())
|
||||
metrics.ObserveWithStatus(loadbalancerDials, start, err, serviceName)
|
||||
return conn, err
|
||||
},
|
||||
})
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/k3s-io/k3s/pkg/etcd/s3"
|
||||
"github.com/k3s-io/k3s/pkg/etcd/snapshot"
|
||||
"github.com/k3s-io/k3s/pkg/util"
|
||||
"github.com/k3s-io/k3s/pkg/util/metrics"
|
||||
"github.com/k3s-io/k3s/pkg/version"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/robfig/cron/v3"
|
||||
|
@ -191,6 +192,20 @@ func (e *ETCD) decompressSnapshot(snapshotDir, snapshotFile string) (string, err
|
|||
// subcommand for prune that can be run manually if the user wants to remove old snapshots.
|
||||
// Returns metadata about the new and pruned snapshots.
|
||||
func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
|
||||
res, err := e.snapshot(ctx)
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
return res, e.reconcileSnapshotData(ctx, res)
|
||||
}
|
||||
|
||||
// snapshot is the actual snapshot save/upload implementation.
|
||||
// This is not inline in the Snapshot function so that the save and reconcile operation
|
||||
// metrics do not overlap.
|
||||
func (e *ETCD) snapshot(ctx context.Context) (_ *managed.SnapshotResult, rerr error) {
|
||||
snapshotStart := time.Now()
|
||||
defer metrics.ObserveWithStatus(snapshotSaveCount, snapshotStart, rerr)
|
||||
|
||||
if !e.snapshotMu.TryLock() {
|
||||
return nil, errors.New("snapshot save already in progress")
|
||||
}
|
||||
|
@ -243,7 +258,11 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
|
|||
|
||||
var sf *snapshot.File
|
||||
|
||||
if err := snapshotv3.Save(ctx, e.client.GetLogger(), *cfg, snapshotPath); err != nil {
|
||||
saveStart := time.Now()
|
||||
err = snapshotv3.Save(ctx, e.client.GetLogger(), *cfg, snapshotPath)
|
||||
metrics.ObserveWithStatus(snapshotSaveLocalCount, saveStart, err)
|
||||
|
||||
if err != nil {
|
||||
sf = &snapshot.File{
|
||||
Name: snapshotName,
|
||||
Location: "",
|
||||
|
@ -319,9 +338,11 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
|
|||
res.Deleted = append(res.Deleted, deleted...)
|
||||
|
||||
if e.config.EtcdS3 != nil {
|
||||
s3Start := time.Now()
|
||||
if s3client, err := e.getS3Client(ctx); err != nil {
|
||||
logrus.Warnf("Unable to initialize S3 client: %v", err)
|
||||
if !errors.Is(err, s3.ErrNoConfigSecret) {
|
||||
metrics.ObserveWithStatus(snapshotSaveS3Count, s3Start, err)
|
||||
err = errors.Wrap(err, "failed to initialize S3 client")
|
||||
sf = &snapshot.File{
|
||||
Name: f.Name(),
|
||||
|
@ -341,6 +362,7 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
|
|||
// upload will return a snapshot.File even on error - if there was an
|
||||
// error, it will be reflected in the status and message.
|
||||
sf, err = s3client.Upload(ctx, snapshotPath, extraMetadata, now)
|
||||
metrics.ObserveWithStatus(snapshotSaveS3Count, s3Start, err)
|
||||
if err != nil {
|
||||
logrus.Errorf("Error received during snapshot upload to S3: %s", err)
|
||||
} else {
|
||||
|
@ -365,7 +387,7 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
|
|||
}
|
||||
}
|
||||
|
||||
return res, e.reconcileSnapshotData(ctx, res)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// listLocalSnapshots provides a list of the currently stored
|
||||
|
@ -659,7 +681,10 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
|||
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to
|
||||
// list S3 snapshots and reconcile snapshots from S3. Any snapshots listed in the Deleted field of
|
||||
// the provided SnapshotResult are deleted, even if they are within a retention window.
|
||||
func (e *ETCD) reconcileSnapshotData(ctx context.Context, res *managed.SnapshotResult) error {
|
||||
func (e *ETCD) reconcileSnapshotData(ctx context.Context, res *managed.SnapshotResult) (rerr error) {
|
||||
reconcileStart := time.Now()
|
||||
defer metrics.ObserveWithStatus(snapshotReconcileCount, reconcileStart, rerr)
|
||||
|
||||
// make sure the core.Factory is initialized. There can
|
||||
// be a race between this core code startup.
|
||||
for e.config.Runtime.Core == nil {
|
||||
|
@ -670,7 +695,9 @@ func (e *ETCD) reconcileSnapshotData(ctx context.Context, res *managed.SnapshotR
|
|||
defer logrus.Infof("Reconciliation of ETCDSnapshotFile resources complete")
|
||||
|
||||
// Get snapshots from local filesystem
|
||||
localStart := time.Now()
|
||||
snapshotFiles, err := e.listLocalSnapshots()
|
||||
metrics.ObserveWithStatus(snapshotReconcileLocalCount, localStart, err)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -679,13 +706,17 @@ func (e *ETCD) reconcileSnapshotData(ctx context.Context, res *managed.SnapshotR
|
|||
|
||||
// Get snapshots from S3
|
||||
if e.config.EtcdS3 != nil {
|
||||
s3Start := time.Now()
|
||||
if s3client, err := e.getS3Client(ctx); err != nil {
|
||||
logrus.Warnf("Unable to initialize S3 client: %v", err)
|
||||
if !errors.Is(err, s3.ErrNoConfigSecret) {
|
||||
metrics.ObserveWithStatus(snapshotReconcileS3Count, s3Start, err)
|
||||
return errors.Wrap(err, "failed to initialize S3 client")
|
||||
}
|
||||
} else {
|
||||
if s3Snapshots, err := s3client.ListSnapshots(ctx); err != nil {
|
||||
s3Snapshots, err := s3client.ListSnapshots(ctx)
|
||||
metrics.ObserveWithStatus(snapshotReconcileS3Count, s3Start, err)
|
||||
if err != nil {
|
||||
logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err)
|
||||
} else {
|
||||
for k, v := range s3Snapshots {
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"github.com/k3s-io/k3s/pkg/version"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"k8s.io/component-base/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
snapshotSaveCount = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: version.Program + "_etcd_snapshot_save_duration_seconds",
|
||||
Help: "Total time taken to complete the etcd snapshot process",
|
||||
Buckets: metrics.ExponentialBuckets(0.008, 2, 15),
|
||||
}, []string{"status"})
|
||||
|
||||
snapshotSaveLocalCount = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: version.Program + "_etcd_snapshot_save_local_duration_seconds",
|
||||
Help: "Total time taken to save a local snapshot file",
|
||||
Buckets: metrics.ExponentialBuckets(0.008, 2, 15),
|
||||
}, []string{"status"})
|
||||
|
||||
snapshotSaveS3Count = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: version.Program + "_etcd_snapshot_save_s3_duration_seconds",
|
||||
Help: "Total time taken to upload a snapshot file to S3",
|
||||
Buckets: metrics.ExponentialBuckets(0.008, 2, 15),
|
||||
}, []string{"status"})
|
||||
|
||||
snapshotReconcileCount = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: version.Program + "_etcd_snapshot_reconcile_duration_seconds",
|
||||
Help: "Total time taken to sync the list of etcd snapshots",
|
||||
Buckets: metrics.ExponentialBuckets(0.008, 2, 15),
|
||||
}, []string{"status"})
|
||||
|
||||
snapshotReconcileLocalCount = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: version.Program + "_etcd_snapshot_reconcile_local_duration_seconds",
|
||||
Help: "Total time taken to list local snapshot files",
|
||||
Buckets: metrics.ExponentialBuckets(0.008, 2, 15),
|
||||
}, []string{"status"})
|
||||
|
||||
snapshotReconcileS3Count = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: version.Program + "_etcd_snapshot_reconcile_s3_duration_seconds",
|
||||
Help: "Total time taken to list S3 snapshot files",
|
||||
Buckets: metrics.ExponentialBuckets(0.008, 2, 15),
|
||||
}, []string{"status"})
|
||||
)
|
||||
|
||||
// MustRegister registers etcd snapshot metrics
|
||||
func MustRegister(registerer prometheus.Registerer) {
|
||||
registerer.MustRegister(
|
||||
snapshotSaveCount,
|
||||
snapshotSaveLocalCount,
|
||||
snapshotSaveS3Count,
|
||||
snapshotReconcileCount,
|
||||
snapshotReconcileLocalCount,
|
||||
snapshotReconcileS3Count,
|
||||
)
|
||||
}
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/k3s-io/k3s/pkg/agent/https"
|
||||
"github.com/k3s-io/k3s/pkg/agent/loadbalancer"
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config"
|
||||
"github.com/k3s-io/k3s/pkg/etcd"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
lassometrics "github.com/rancher/lasso/pkg/metrics"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
|
@ -35,6 +36,8 @@ func init() {
|
|||
lassometrics.MustRegister(DefaultRegisterer)
|
||||
// same for loadbalancer metrics
|
||||
loadbalancer.MustRegister(DefaultRegisterer)
|
||||
// and etcd snapshot metrics
|
||||
etcd.MustRegister(DefaultRegisterer)
|
||||
}
|
||||
|
||||
// Config holds fields for the metrics listener
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
func ObserveWithStatus(vec *prometheus.HistogramVec, start time.Time, err error, labels ...string) {
|
||||
status := "success"
|
||||
if err != nil {
|
||||
status = "error"
|
||||
}
|
||||
labels = append(labels, status)
|
||||
vec.WithLabelValues(labels...).Observe(time.Since(start).Seconds())
|
||||
}
|
Loading…
Reference in New Issue