feat: replications metrics include failure to enqueue (#22962)
* feat: replications metrics include failure to enqueuepull/22965/head
parent
28bcd416b2
commit
6096ee2ad4
|
@ -352,10 +352,17 @@ func (qm *durableQueueManager) CloseAll() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnqueueData persists a set of bytes to a replication's durable queue.
|
// EnqueueData persists a set of bytes to a replication's durable queue.
|
||||||
func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byte, numPoints int) error {
|
func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byte, numPoints int) (err error) {
|
||||||
qm.mutex.RLock()
|
qm.mutex.RLock()
|
||||||
defer qm.mutex.RUnlock()
|
defer qm.mutex.RUnlock()
|
||||||
|
|
||||||
|
// Update metrics if data fails to be added to queue
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
qm.metrics.EnqueueError(replicationID, len(data), numPoints)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
rq, ok := qm.replicationQueues[replicationID]
|
rq, ok := qm.replicationQueues[replicationID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("durable queue not found for replication ID %q", replicationID)
|
return fmt.Errorf("durable queue not found for replication ID %q", replicationID)
|
||||||
|
|
|
@ -427,6 +427,32 @@ func TestEnqueueData_WithMetrics(t *testing.T) {
|
||||||
require.Equal(t, float64(0), currentBytesQueued.Gauge.GetValue())
|
require.Equal(t, float64(0), currentBytesQueued.Gauge.GetValue())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEnqueueData_EnqueueFailure(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
path, qm := initQueueManager(t)
|
||||||
|
defer os.RemoveAll(path)
|
||||||
|
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes))
|
||||||
|
require.DirExists(t, filepath.Join(path, id1.String()))
|
||||||
|
|
||||||
|
rq, ok := qm.replicationQueues[id1]
|
||||||
|
require.True(t, ok)
|
||||||
|
// Close the underlying queue so an error is generated if we try to append to it
|
||||||
|
require.NoError(t, rq.queue.Close())
|
||||||
|
|
||||||
|
reg := prom.NewRegistry(zaptest.NewLogger(t))
|
||||||
|
reg.MustRegister(qm.metrics.PrometheusCollectors()...)
|
||||||
|
|
||||||
|
data := "some fake data"
|
||||||
|
numPointsPerData := 3
|
||||||
|
require.Error(t, qm.EnqueueData(id1, []byte(data), numPointsPerData)) // this will generate an error because of the closed queue
|
||||||
|
|
||||||
|
droppedPoints := getPromMetric(t, "replications_queue_points_failed_to_queue", reg)
|
||||||
|
require.Equal(t, numPointsPerData, int(droppedPoints.Counter.GetValue()))
|
||||||
|
droppedBytes := getPromMetric(t, "replications_queue_bytes_failed_to_queue", reg)
|
||||||
|
require.Equal(t, len(data), int(droppedBytes.Counter.GetValue()))
|
||||||
|
}
|
||||||
|
|
||||||
func getPromMetric(t *testing.T, name string, reg *prom.Registry) *dto.Metric {
|
func getPromMetric(t *testing.T, name string, reg *prom.Registry) *dto.Metric {
|
||||||
mfs := promtest.MustGather(t, reg)
|
mfs := promtest.MustGather(t, reg)
|
||||||
return promtest.FindMetric(mfs, name, map[string]string{
|
return promtest.FindMetric(mfs, name, map[string]string{
|
||||||
|
|
|
@ -14,6 +14,8 @@ type ReplicationsMetrics struct {
|
||||||
RemoteWriteErrors *prometheus.CounterVec
|
RemoteWriteErrors *prometheus.CounterVec
|
||||||
RemoteWriteBytesSent *prometheus.CounterVec
|
RemoteWriteBytesSent *prometheus.CounterVec
|
||||||
RemoteWriteBytesDropped *prometheus.CounterVec
|
RemoteWriteBytesDropped *prometheus.CounterVec
|
||||||
|
PointsFailedToQueue *prometheus.CounterVec
|
||||||
|
BytesFailedToQueue *prometheus.CounterVec
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewReplicationsMetrics() *ReplicationsMetrics {
|
func NewReplicationsMetrics() *ReplicationsMetrics {
|
||||||
|
@ -25,19 +27,19 @@ func NewReplicationsMetrics() *ReplicationsMetrics {
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "total_points_queued",
|
Name: "total_points_queued",
|
||||||
Help: "Sum of all points that have been added to the replication stream queue",
|
Help: "Sum of all points that have been successfully added to the replication stream queue",
|
||||||
}, []string{"replicationID"}),
|
}, []string{"replicationID"}),
|
||||||
TotalBytesQueued: prometheus.NewCounterVec(prometheus.CounterOpts{
|
TotalBytesQueued: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "total_bytes_queued",
|
Name: "total_bytes_queued",
|
||||||
Help: "Sum of all bytes that have been added to the replication stream queue",
|
Help: "Sum of all bytes that have been successfully added to the replication stream queue",
|
||||||
}, []string{"replicationID"}),
|
}, []string{"replicationID"}),
|
||||||
CurrentBytesQueued: prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
CurrentBytesQueued: prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "current_bytes_queued",
|
Name: "current_bytes_queued",
|
||||||
Help: "Current number of bytes in the replication stream queue",
|
Help: "Current number of bytes in the replication stream queue remaining to be processed",
|
||||||
}, []string{"replicationID"}),
|
}, []string{"replicationID"}),
|
||||||
RemoteWriteErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
|
RemoteWriteErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
|
@ -49,13 +51,25 @@ func NewReplicationsMetrics() *ReplicationsMetrics {
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "remote_write_bytes_sent",
|
Name: "remote_write_bytes_sent",
|
||||||
Help: "Bytes of data successfully sent by the replication stream",
|
Help: "Bytes of data successfully sent to the remote by the replication stream",
|
||||||
}, []string{"replicationID"}),
|
}, []string{"replicationID"}),
|
||||||
RemoteWriteBytesDropped: prometheus.NewCounterVec(prometheus.CounterOpts{
|
RemoteWriteBytesDropped: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "remote_write_bytes_dropped",
|
Name: "remote_write_bytes_dropped",
|
||||||
Help: "Bytes of data dropped by the replication stream",
|
Help: "Bytes of data dropped due to remote write failures",
|
||||||
|
}, []string{"replicationID"}),
|
||||||
|
PointsFailedToQueue: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "points_failed_to_queue",
|
||||||
|
Help: "Sum of all points that could not be added to the local replication queue",
|
||||||
|
}, []string{"replicationID"}),
|
||||||
|
BytesFailedToQueue: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "bytes_failed_to_queue",
|
||||||
|
Help: "Sum of all bytes that could not be added to the local replication queue",
|
||||||
}, []string{"replicationID"}),
|
}, []string{"replicationID"}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -69,6 +83,8 @@ func (rm *ReplicationsMetrics) PrometheusCollectors() []prometheus.Collector {
|
||||||
rm.RemoteWriteErrors,
|
rm.RemoteWriteErrors,
|
||||||
rm.RemoteWriteBytesSent,
|
rm.RemoteWriteBytesSent,
|
||||||
rm.RemoteWriteBytesDropped,
|
rm.RemoteWriteBytesDropped,
|
||||||
|
rm.PointsFailedToQueue,
|
||||||
|
rm.BytesFailedToQueue,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,6 +100,12 @@ func (rm *ReplicationsMetrics) Dequeue(replicationID platform.ID, queueSize int6
|
||||||
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSize))
|
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EnqueueError updates the metrics when data fails to be added to the replication queue.
|
||||||
|
func (rm *ReplicationsMetrics) EnqueueError(replicationID platform.ID, numBytes, numPoints int) {
|
||||||
|
rm.PointsFailedToQueue.WithLabelValues(replicationID.String()).Add(float64(numPoints))
|
||||||
|
rm.BytesFailedToQueue.WithLabelValues(replicationID.String()).Add(float64(numBytes))
|
||||||
|
}
|
||||||
|
|
||||||
// RemoteWriteError increments the error code counter for the replication.
|
// RemoteWriteError increments the error code counter for the replication.
|
||||||
func (rm *ReplicationsMetrics) RemoteWriteError(replicationID platform.ID, errorCode int) {
|
func (rm *ReplicationsMetrics) RemoteWriteError(replicationID platform.ID, errorCode int) {
|
||||||
rm.RemoteWriteErrors.WithLabelValues(replicationID.String(), strconv.Itoa(errorCode)).Inc()
|
rm.RemoteWriteErrors.WithLabelValues(replicationID.String(), strconv.Itoa(errorCode)).Inc()
|
||||||
|
|
Loading…
Reference in New Issue