commit
82e1ebbe0c
|
@ -0,0 +1 @@
|
|||
Add data upload and download metrics
|
|
@ -224,9 +224,9 @@ func (s *nodeAgentServer) run() {
|
|||
s.logger.Fatalf("Failed to start metric server for node agent at [%s]: %v", s.metricsAddress, err)
|
||||
}
|
||||
}()
|
||||
s.metrics = metrics.NewPodVolumeMetrics()
|
||||
s.metrics = metrics.NewNodeMetrics()
|
||||
s.metrics.RegisterAllMetrics()
|
||||
s.metrics.InitPodVolumeMetricsForNode(s.nodeName)
|
||||
s.metrics.InitMetricsForNode(s.nodeName)
|
||||
|
||||
s.markInProgressCRsFailed()
|
||||
|
||||
|
@ -260,13 +260,13 @@ func (s *nodeAgentServer) run() {
|
|||
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
|
||||
}
|
||||
|
||||
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger)
|
||||
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
|
||||
s.markDataUploadsCancel(dataUploadReconciler)
|
||||
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
|
||||
}
|
||||
|
||||
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger)
|
||||
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
|
||||
s.markDataDownloadsCancel(dataDownloadReconciler)
|
||||
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.WithError(err).Fatal("Unable to create the data download controller")
|
||||
|
|
|
@ -44,6 +44,7 @@ import (
|
|||
datamover "github.com/vmware-tanzu/velero/pkg/datamover"
|
||||
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||
"github.com/vmware-tanzu/velero/pkg/exposer"
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
repository "github.com/vmware-tanzu/velero/pkg/repository"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
|
@ -63,10 +64,11 @@ type DataDownloadReconciler struct {
|
|||
repositoryEnsurer *repository.Ensurer
|
||||
dataPathMgr *datapath.Manager
|
||||
preparingTimeout time.Duration
|
||||
metrics *metrics.ServerMetrics
|
||||
}
|
||||
|
||||
func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface,
|
||||
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger) *DataDownloadReconciler {
|
||||
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
|
||||
return &DataDownloadReconciler{
|
||||
client: client,
|
||||
kubeClient: kubeClient,
|
||||
|
@ -79,6 +81,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter
|
|||
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
|
||||
dataPathMgr: datapath.NewManager(1),
|
||||
preparingTimeout: preparingTimeout,
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -301,6 +304,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na
|
|||
log.WithError(err).Error("error updating data download status")
|
||||
} else {
|
||||
log.Infof("Data download is marked as %s", dd.Status.Phase)
|
||||
r.metrics.RegisterDataDownloadSuccess(r.nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -343,6 +347,8 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
|
|||
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
|
||||
log.WithError(err).Error("error updating data download status")
|
||||
} else {
|
||||
r.metrics.RegisterDataDownloadCancel(r.nodeName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -497,6 +503,8 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v
|
|||
|
||||
if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil {
|
||||
log.WithError(patchErr).Error("error updating DataDownload status")
|
||||
} else {
|
||||
r.metrics.RegisterDataDownloadFailure(r.nodeName)
|
||||
}
|
||||
|
||||
return err
|
||||
|
@ -548,6 +556,8 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler
|
|||
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
|
||||
|
||||
log.Info("Dataupload has been cleaned up")
|
||||
|
||||
r.metrics.RegisterDataDownloadFailure(r.nodeName)
|
||||
}
|
||||
|
||||
func (r *DataDownloadReconciler) exclusiveUpdateDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload,
|
||||
|
|
|
@ -46,6 +46,7 @@ import (
|
|||
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
|
||||
"github.com/vmware-tanzu/velero/pkg/exposer"
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
|
||||
|
@ -136,7 +137,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger()), nil
|
||||
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
}
|
||||
|
||||
func TestDataDownloadReconcile(t *testing.T) {
|
||||
|
|
|
@ -46,6 +46,7 @@ import (
|
|||
"github.com/vmware-tanzu/velero/pkg/datamover"
|
||||
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||
"github.com/vmware-tanzu/velero/pkg/exposer"
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
|
@ -71,11 +72,12 @@ type DataUploadReconciler struct {
|
|||
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
|
||||
dataPathMgr *datapath.Manager
|
||||
preparingTimeout time.Duration
|
||||
metrics *metrics.ServerMetrics
|
||||
}
|
||||
|
||||
func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface,
|
||||
csiSnapshotClient snapshotter.SnapshotV1Interface, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution,
|
||||
cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger) *DataUploadReconciler {
|
||||
cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler {
|
||||
return &DataUploadReconciler{
|
||||
client: client,
|
||||
kubeClient: kubeClient,
|
||||
|
@ -89,6 +91,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa
|
|||
snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)},
|
||||
dataPathMgr: datapath.NewManager(1),
|
||||
preparingTimeout: preparingTimeout,
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -308,8 +311,10 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp
|
|||
|
||||
if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil {
|
||||
log.WithError(err).Error("error updating DataUpload status")
|
||||
} else {
|
||||
log.Info("Data upload completed")
|
||||
r.metrics.RegisterDataUploadSuccess(r.nodeName)
|
||||
}
|
||||
log.Info("Data upload completed")
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) {
|
||||
|
@ -360,6 +365,8 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp
|
|||
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil {
|
||||
log.WithError(err).Error("error updating DataUpload status")
|
||||
} else {
|
||||
r.metrics.RegisterDataUploadCancel(r.nodeName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -518,6 +525,8 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel
|
|||
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
|
||||
if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil {
|
||||
log.WithError(patchErr).Error("error updating DataUpload status")
|
||||
} else {
|
||||
r.metrics.RegisterDataUploadFailure(r.nodeName)
|
||||
}
|
||||
|
||||
return err
|
||||
|
@ -580,6 +589,8 @@ func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov
|
|||
|
||||
log.Info("Dataupload has been cleaned up")
|
||||
}
|
||||
|
||||
r.metrics.RegisterDataUploadFailure(r.nodeName)
|
||||
}
|
||||
|
||||
func (r *DataUploadReconciler) exclusiveUpdateDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload,
|
||||
|
|
|
@ -46,6 +46,7 @@ import (
|
|||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||
"github.com/vmware-tanzu/velero/pkg/exposer"
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
|
@ -193,7 +194,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
|
|||
return nil, err
|
||||
}
|
||||
return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), nil,
|
||||
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger()), nil
|
||||
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
}
|
||||
|
||||
func dataUploadBuilder() *builder.DataUploadBuilder {
|
||||
|
|
|
@ -192,7 +192,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
|
|||
r := PodVolumeBackupReconciler{
|
||||
Client: fakeClient,
|
||||
clock: testclocks.NewFakeClock(now),
|
||||
metrics: metrics.NewPodVolumeMetrics(),
|
||||
metrics: metrics.NewNodeMetrics(),
|
||||
credentialGetter: &credentials.CredentialGetter{FromFile: credentialFileStore},
|
||||
nodeName: "test_node",
|
||||
fileSystem: fakeFS,
|
||||
|
|
|
@ -66,6 +66,14 @@ const (
|
|||
podVolumeOperationLatencySeconds = "pod_volume_operation_latency_seconds"
|
||||
podVolumeOperationLatencyGaugeSeconds = "pod_volume_operation_latency_seconds_gauge"
|
||||
|
||||
// data mover metrics
|
||||
DataUploadSuccessTotal = "data_upload_success_total"
|
||||
DataUploadFailureTotal = "data_upload_failure_total"
|
||||
DataUploadCancelTotal = "data_upload_cancel_total"
|
||||
DataDownloadSuccessTotal = "data_download_success_total"
|
||||
DataDownloadFailureTotal = "data_download_failure_total"
|
||||
DataDownloadCancelTotal = "data_download_cancel_total"
|
||||
|
||||
// Labels
|
||||
nodeMetricLabel = "node"
|
||||
podVolumeOperationLabel = "operation"
|
||||
|
@ -319,7 +327,7 @@ func NewServerMetrics() *ServerMetrics {
|
|||
}
|
||||
}
|
||||
|
||||
func NewPodVolumeMetrics() *ServerMetrics {
|
||||
func NewNodeMetrics() *ServerMetrics {
|
||||
return &ServerMetrics{
|
||||
metrics: map[string]prometheus.Collector{
|
||||
podVolumeBackupEnqueueTotal: prometheus.NewCounterVec(
|
||||
|
@ -365,6 +373,54 @@ func NewPodVolumeMetrics() *ServerMetrics {
|
|||
},
|
||||
[]string{nodeMetricLabel, podVolumeOperationLabel, backupNameLabel, pvbNameLabel},
|
||||
),
|
||||
DataUploadSuccessTotal: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: podVolumeMetricsNamespace,
|
||||
Name: DataUploadSuccessTotal,
|
||||
Help: "Total number of successful uploaded snapshots",
|
||||
},
|
||||
[]string{nodeMetricLabel},
|
||||
),
|
||||
DataUploadFailureTotal: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: podVolumeMetricsNamespace,
|
||||
Name: DataUploadFailureTotal,
|
||||
Help: "Total number of failed uploaded snapshots",
|
||||
},
|
||||
[]string{nodeMetricLabel},
|
||||
),
|
||||
DataUploadCancelTotal: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: podVolumeMetricsNamespace,
|
||||
Name: DataUploadCancelTotal,
|
||||
Help: "Total number of canceled uploaded snapshots",
|
||||
},
|
||||
[]string{nodeMetricLabel},
|
||||
),
|
||||
DataDownloadSuccessTotal: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: podVolumeMetricsNamespace,
|
||||
Name: DataDownloadSuccessTotal,
|
||||
Help: "Total number of successful downloaded snapshots",
|
||||
},
|
||||
[]string{nodeMetricLabel},
|
||||
),
|
||||
DataDownloadFailureTotal: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: podVolumeMetricsNamespace,
|
||||
Name: DataDownloadFailureTotal,
|
||||
Help: "Total number of failed downloaded snapshots",
|
||||
},
|
||||
[]string{nodeMetricLabel},
|
||||
),
|
||||
DataDownloadCancelTotal: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: podVolumeMetricsNamespace,
|
||||
Name: DataDownloadCancelTotal,
|
||||
Help: "Total number of canceled downloaded snapshots",
|
||||
},
|
||||
[]string{nodeMetricLabel},
|
||||
),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -450,13 +506,31 @@ func (m *ServerMetrics) InitSchedule(scheduleName string) {
|
|||
}
|
||||
|
||||
// InitSchedule initializes counter metrics for a node.
|
||||
func (m *ServerMetrics) InitPodVolumeMetricsForNode(node string) {
|
||||
func (m *ServerMetrics) InitMetricsForNode(node string) {
|
||||
if c, ok := m.metrics[podVolumeBackupEnqueueTotal].(*prometheus.CounterVec); ok {
|
||||
c.WithLabelValues(node).Add(0)
|
||||
}
|
||||
if c, ok := m.metrics[podVolumeBackupDequeueTotal].(*prometheus.CounterVec); ok {
|
||||
c.WithLabelValues(node).Add(0)
|
||||
}
|
||||
if c, ok := m.metrics[DataUploadSuccessTotal].(*prometheus.CounterVec); ok {
|
||||
c.WithLabelValues(node).Add(0)
|
||||
}
|
||||
if c, ok := m.metrics[DataUploadFailureTotal].(*prometheus.CounterVec); ok {
|
||||
c.WithLabelValues(node).Add(0)
|
||||
}
|
||||
if c, ok := m.metrics[DataUploadCancelTotal].(*prometheus.CounterVec); ok {
|
||||
c.WithLabelValues(node).Add(0)
|
||||
}
|
||||
if c, ok := m.metrics[DataDownloadSuccessTotal].(*prometheus.CounterVec); ok {
|
||||
c.WithLabelValues(node).Add(0)
|
||||
}
|
||||
if c, ok := m.metrics[DataDownloadFailureTotal].(*prometheus.CounterVec); ok {
|
||||
c.WithLabelValues(node).Add(0)
|
||||
}
|
||||
if c, ok := m.metrics[DataDownloadCancelTotal].(*prometheus.CounterVec); ok {
|
||||
c.WithLabelValues(node).Add(0)
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterPodVolumeBackupEnqueue records enqueuing of a PodVolumeBackup object.
|
||||
|
@ -473,6 +547,48 @@ func (m *ServerMetrics) RegisterPodVolumeBackupDequeue(node string) {
|
|||
}
|
||||
}
|
||||
|
||||
// RegisterDataUploadSuccess records successful uploaded snapshots.
|
||||
func (m *ServerMetrics) RegisterDataUploadSuccess(node string) {
|
||||
if c, ok := m.metrics[DataUploadSuccessTotal].(*prometheus.CounterVec); ok {
|
||||
c.WithLabelValues(node).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterDataUploadFailure records failed uploaded snapshots.
|
||||
func (m *ServerMetrics) RegisterDataUploadFailure(node string) {
|
||||
if c, ok := m.metrics[DataUploadFailureTotal].(*prometheus.CounterVec); ok {
|
||||
c.WithLabelValues(node).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterDataUploadCancel records canceled uploaded snapshots.
|
||||
func (m *ServerMetrics) RegisterDataUploadCancel(node string) {
|
||||
if c, ok := m.metrics[DataUploadCancelTotal].(*prometheus.CounterVec); ok {
|
||||
c.WithLabelValues(node).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterDataDownloadSuccess records successful downloaded snapshots.
|
||||
func (m *ServerMetrics) RegisterDataDownloadSuccess(node string) {
|
||||
if c, ok := m.metrics[DataDownloadSuccessTotal].(*prometheus.CounterVec); ok {
|
||||
c.WithLabelValues(node).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterDataDownloadFailure records failed downloaded snapshots.
|
||||
func (m *ServerMetrics) RegisterDataDownloadFailure(node string) {
|
||||
if c, ok := m.metrics[DataDownloadFailureTotal].(*prometheus.CounterVec); ok {
|
||||
c.WithLabelValues(node).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterDataDownloadCancel records canceled downloaded snapshots.
|
||||
func (m *ServerMetrics) RegisterDataDownloadCancel(node string) {
|
||||
if c, ok := m.metrics[DataDownloadCancelTotal].(*prometheus.CounterVec); ok {
|
||||
c.WithLabelValues(node).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
// ObservePodVolumeOpLatency records the number of seconds a pod volume operation took.
|
||||
func (m *ServerMetrics) ObservePodVolumeOpLatency(node, pvbName, opName, backupName string, seconds float64) {
|
||||
if h, ok := m.metrics[podVolumeOperationLatencySeconds].(*prometheus.HistogramVec); ok {
|
||||
|
|
Loading…
Reference in New Issue