123 lines
5.1 KiB
Go
123 lines
5.1 KiB
Go
package metrics
|
|
|
|
import (
|
|
"strconv"
|
|
|
|
"github.com/influxdata/influxdb/v2/kit/platform"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
type ReplicationsMetrics struct {
|
|
TotalPointsQueued *prometheus.CounterVec
|
|
TotalBytesQueued *prometheus.CounterVec
|
|
CurrentBytesQueued *prometheus.GaugeVec
|
|
RemoteWriteErrors *prometheus.CounterVec
|
|
RemoteWriteBytesSent *prometheus.CounterVec
|
|
RemoteWriteBytesDropped *prometheus.CounterVec
|
|
PointsFailedToQueue *prometheus.CounterVec
|
|
BytesFailedToQueue *prometheus.CounterVec
|
|
}
|
|
|
|
func NewReplicationsMetrics() *ReplicationsMetrics {
|
|
const namespace = "replications"
|
|
const subsystem = "queue"
|
|
|
|
return &ReplicationsMetrics{
|
|
TotalPointsQueued: prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "total_points_queued",
|
|
Help: "Sum of all points that have been successfully added to the replication stream queue",
|
|
}, []string{"replicationID"}),
|
|
TotalBytesQueued: prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "total_bytes_queued",
|
|
Help: "Sum of all bytes that have been successfully added to the replication stream queue",
|
|
}, []string{"replicationID"}),
|
|
CurrentBytesQueued: prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "current_bytes_queued",
|
|
Help: "Current number of bytes in the replication stream queue remaining to be processed",
|
|
}, []string{"replicationID"}),
|
|
RemoteWriteErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "remote_write_errors",
|
|
Help: "Error codes returned from attempted remote writes",
|
|
}, []string{"replicationID", "code"}),
|
|
RemoteWriteBytesSent: prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "remote_write_bytes_sent",
|
|
Help: "Bytes of data successfully sent to the remote by the replication stream",
|
|
}, []string{"replicationID"}),
|
|
RemoteWriteBytesDropped: prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "remote_write_bytes_dropped",
|
|
Help: "Bytes of data dropped due to remote write failures",
|
|
}, []string{"replicationID"}),
|
|
PointsFailedToQueue: prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "points_failed_to_queue",
|
|
Help: "Sum of all points that could not be added to the local replication queue",
|
|
}, []string{"replicationID"}),
|
|
BytesFailedToQueue: prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "bytes_failed_to_queue",
|
|
Help: "Sum of all bytes that could not be added to the local replication queue",
|
|
}, []string{"replicationID"}),
|
|
}
|
|
}
|
|
|
|
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
|
|
func (rm *ReplicationsMetrics) PrometheusCollectors() []prometheus.Collector {
|
|
return []prometheus.Collector{
|
|
rm.TotalPointsQueued,
|
|
rm.TotalBytesQueued,
|
|
rm.CurrentBytesQueued,
|
|
rm.RemoteWriteErrors,
|
|
rm.RemoteWriteBytesSent,
|
|
rm.RemoteWriteBytesDropped,
|
|
rm.PointsFailedToQueue,
|
|
rm.BytesFailedToQueue,
|
|
}
|
|
}
|
|
|
|
// EnqueueData updates the metrics when adding new data to a replication queue.
|
|
func (rm *ReplicationsMetrics) EnqueueData(replicationID platform.ID, numBytes, numPoints int, queueSize int64) {
|
|
rm.TotalPointsQueued.WithLabelValues(replicationID.String()).Add(float64(numPoints))
|
|
rm.TotalBytesQueued.WithLabelValues(replicationID.String()).Add(float64(numBytes))
|
|
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSize))
|
|
}
|
|
|
|
// Dequeue updates the metrics when data has been removed from the queue.
|
|
func (rm *ReplicationsMetrics) Dequeue(replicationID platform.ID, queueSize int64) {
|
|
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSize))
|
|
}
|
|
|
|
// EnqueueError updates the metrics when data fails to be added to the replication queue.
|
|
func (rm *ReplicationsMetrics) EnqueueError(replicationID platform.ID, numBytes, numPoints int) {
|
|
rm.PointsFailedToQueue.WithLabelValues(replicationID.String()).Add(float64(numPoints))
|
|
rm.BytesFailedToQueue.WithLabelValues(replicationID.String()).Add(float64(numBytes))
|
|
}
|
|
|
|
// RemoteWriteError increments the error code counter for the replication.
|
|
func (rm *ReplicationsMetrics) RemoteWriteError(replicationID platform.ID, errorCode int) {
|
|
rm.RemoteWriteErrors.WithLabelValues(replicationID.String(), strconv.Itoa(errorCode)).Inc()
|
|
}
|
|
|
|
// RemoteWriteSent increases the total count of bytes sent following a successful remote write
|
|
func (rm *ReplicationsMetrics) RemoteWriteSent(replicationID platform.ID, bytes int) {
|
|
rm.RemoteWriteBytesSent.WithLabelValues(replicationID.String()).Add(float64(bytes))
|
|
}
|
|
|
|
// RemoteWriteDropped increases the total count of bytes dropped when data is dropped
|
|
func (rm *ReplicationsMetrics) RemoteWriteDropped(replicationID platform.ID, bytes int) {
|
|
rm.RemoteWriteBytesDropped.WithLabelValues(replicationID.String()).Add(float64(bytes))
|
|
}
|