From 6096ee2ad4fb0b8de308e23fe7fe0bd0a363de63 Mon Sep 17 00:00:00 2001 From: William Baker Date: Thu, 2 Dec 2021 14:42:55 -0600 Subject: [PATCH] feat: replications metrics include failure to enqueue (#22962) * feat: replications metrics include failure to enqueue --- replications/internal/queue_management.go | 9 +++++- .../internal/queue_management_test.go | 26 +++++++++++++++ replications/metrics/replications_metrics.go | 32 ++++++++++++++++--- 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/replications/internal/queue_management.go b/replications/internal/queue_management.go index ed7589f624..0a58b4c2fe 100644 --- a/replications/internal/queue_management.go +++ b/replications/internal/queue_management.go @@ -352,10 +352,17 @@ func (qm *durableQueueManager) CloseAll() error { } // EnqueueData persists a set of bytes to a replication's durable queue. -func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byte, numPoints int) error { +func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byte, numPoints int) (err error) { qm.mutex.RLock() defer qm.mutex.RUnlock() + // Update metrics if data fails to be added to queue + defer func() { + if err != nil { + qm.metrics.EnqueueError(replicationID, len(data), numPoints) + } + }() + rq, ok := qm.replicationQueues[replicationID] if !ok { return fmt.Errorf("durable queue not found for replication ID %q", replicationID) diff --git a/replications/internal/queue_management_test.go b/replications/internal/queue_management_test.go index a7f6d12e2c..dfa84b13bd 100644 --- a/replications/internal/queue_management_test.go +++ b/replications/internal/queue_management_test.go @@ -427,6 +427,32 @@ func TestEnqueueData_WithMetrics(t *testing.T) { require.Equal(t, float64(0), currentBytesQueued.Gauge.GetValue()) } +func TestEnqueueData_EnqueueFailure(t *testing.T) { + t.Parallel() + + path, qm := initQueueManager(t) + defer os.RemoveAll(path) + require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes)) + require.DirExists(t, filepath.Join(path, id1.String())) + + rq, ok := qm.replicationQueues[id1] + require.True(t, ok) + // Close the underlying queue so an error is generated if we try to append to it + require.NoError(t, rq.queue.Close()) + + reg := prom.NewRegistry(zaptest.NewLogger(t)) + reg.MustRegister(qm.metrics.PrometheusCollectors()...) + + data := "some fake data" + numPointsPerData := 3 + require.Error(t, qm.EnqueueData(id1, []byte(data), numPointsPerData)) // this will generate an error because of the closed queue + + droppedPoints := getPromMetric(t, "replications_queue_points_failed_to_queue", reg) + require.Equal(t, numPointsPerData, int(droppedPoints.Counter.GetValue())) + droppedBytes := getPromMetric(t, "replications_queue_bytes_failed_to_queue", reg) + require.Equal(t, len(data), int(droppedBytes.Counter.GetValue())) +} + func getPromMetric(t *testing.T, name string, reg *prom.Registry) *dto.Metric { mfs := promtest.MustGather(t, reg) return promtest.FindMetric(mfs, name, map[string]string{ diff --git a/replications/metrics/replications_metrics.go b/replications/metrics/replications_metrics.go index ce435fff8a..a15f191839 100644 --- a/replications/metrics/replications_metrics.go +++ b/replications/metrics/replications_metrics.go @@ -14,6 +14,8 @@ type ReplicationsMetrics struct { RemoteWriteErrors *prometheus.CounterVec RemoteWriteBytesSent *prometheus.CounterVec RemoteWriteBytesDropped *prometheus.CounterVec + PointsFailedToQueue *prometheus.CounterVec + BytesFailedToQueue *prometheus.CounterVec } func NewReplicationsMetrics() *ReplicationsMetrics { @@ -25,19 +27,19 @@ func NewReplicationsMetrics() *ReplicationsMetrics { Namespace: namespace, Subsystem: subsystem, Name: "total_points_queued", - Help: "Sum of all points that have been added to the replication stream queue", + Help: "Sum of all points that have been successfully added to the replication stream queue", }, []string{"replicationID"}), TotalBytesQueued: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "total_bytes_queued", - Help: "Sum of all bytes that have been added to the replication stream queue", + Help: "Sum of all bytes that have been successfully added to the replication stream queue", }, []string{"replicationID"}), CurrentBytesQueued: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "current_bytes_queued", - Help: "Current number of bytes in the replication stream queue", + Help: "Current number of bytes in the replication stream queue remaining to be processed", }, []string{"replicationID"}), RemoteWriteErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, @@ -49,13 +51,25 @@ func NewReplicationsMetrics() *ReplicationsMetrics { Namespace: namespace, Subsystem: subsystem, Name: "remote_write_bytes_sent", - Help: "Bytes of data successfully sent by the replication stream", + Help: "Bytes of data successfully sent to the remote by the replication stream", }, []string{"replicationID"}), RemoteWriteBytesDropped: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "remote_write_bytes_dropped", - Help: "Bytes of data dropped by the replication stream", + Help: "Bytes of data dropped due to remote write failures", + }, []string{"replicationID"}), + PointsFailedToQueue: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "points_failed_to_queue", + Help: "Sum of all points that could not be added to the local replication queue", + }, []string{"replicationID"}), + BytesFailedToQueue: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "bytes_failed_to_queue", + Help: "Sum of all bytes that could not be added to the local replication queue", }, []string{"replicationID"}), } } @@ -69,6 +83,8 @@ func (rm *ReplicationsMetrics) PrometheusCollectors() []prometheus.Collector { rm.RemoteWriteErrors, rm.RemoteWriteBytesSent, rm.RemoteWriteBytesDropped, + rm.PointsFailedToQueue, + rm.BytesFailedToQueue, } } @@ -84,6 +100,12 @@ func (rm *ReplicationsMetrics) Dequeue(replicationID platform.ID, queueSize int6 rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSize)) } +// EnqueueError updates the metrics when data fails to be added to the replication queue. +func (rm *ReplicationsMetrics) EnqueueError(replicationID platform.ID, numBytes, numPoints int) { + rm.PointsFailedToQueue.WithLabelValues(replicationID.String()).Add(float64(numPoints)) + rm.BytesFailedToQueue.WithLabelValues(replicationID.String()).Add(float64(numBytes)) +} + // RemoteWriteError increments the error code counter for the replication. func (rm *ReplicationsMetrics) RemoteWriteError(replicationID platform.ID, errorCode int) { rm.RemoteWriteErrors.WithLabelValues(replicationID.String(), strconv.Itoa(errorCode)).Inc()