feat: metrics collection for replications remote writes (#22952)
* feat: metrics collection for replications remote writes * fix: don't update metrics with 204 error code on successful writespull/22956/head
parent
906015023c
commit
f05d0136f1
|
@ -178,7 +178,7 @@ func (rq *replicationQueue) SendWrite() bool {
|
|||
|
||||
// Update metrics after the call to scan.Advance()
|
||||
defer func() {
|
||||
rq.metrics.Dequeue(rq.id, rq.queue.DiskUsage())
|
||||
rq.metrics.Dequeue(rq.id, rq.queue.TotalBytes())
|
||||
}()
|
||||
|
||||
if _, err = scan.Advance(); err != nil {
|
||||
|
@ -361,7 +361,7 @@ func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byt
|
|||
return err
|
||||
}
|
||||
// Update metrics for this replication queue when adding data to the queue.
|
||||
qm.metrics.EnqueueData(replicationID, len(data), numPoints, rq.queue.DiskUsage())
|
||||
qm.metrics.EnqueueData(replicationID, len(data), numPoints, rq.queue.TotalBytes())
|
||||
|
||||
qm.replicationQueues[replicationID].receive <- struct{}{}
|
||||
|
||||
|
|
|
@ -402,13 +402,14 @@ func TestEnqueueData_WithMetrics(t *testing.T) {
|
|||
reg := prom.NewRegistry(zaptest.NewLogger(t))
|
||||
reg.MustRegister(qm.metrics.PrometheusCollectors()...)
|
||||
|
||||
data := []byte("some fake data")
|
||||
data := "some fake data"
|
||||
numPointsPerData := 3
|
||||
numDataToAdd := 4
|
||||
rq.remoteWriter = getTestRemoteWriter(t, data, nil)
|
||||
|
||||
for i := 1; i <= numDataToAdd; i++ {
|
||||
go func() { <-rq.receive }() // absorb the receive to avoid testcase deadlock
|
||||
require.NoError(t, qm.EnqueueData(id1, data, numPointsPerData))
|
||||
require.NoError(t, qm.EnqueueData(id1, []byte(data), numPointsPerData))
|
||||
|
||||
pointCount := getPromMetric(t, "replications_queue_total_points_queued", reg)
|
||||
require.Equal(t, i*numPointsPerData, int(pointCount.Counter.GetValue()))
|
||||
|
@ -417,20 +418,14 @@ func TestEnqueueData_WithMetrics(t *testing.T) {
|
|||
require.Equal(t, i*len(data), int(totalBytesQueued.Counter.GetValue()))
|
||||
|
||||
currentBytesQueued := getPromMetric(t, "replications_queue_current_bytes_queued", reg)
|
||||
// 8 bytes for an empty queue; 8 extra bytes for each byte slice appended to the queue
|
||||
require.Equal(t, 8+i*(8+len(data)), int(currentBytesQueued.Gauge.GetValue()))
|
||||
// 8 extra bytes for each byte slice appended to the queue
|
||||
require.Equal(t, i*(8+len(data)), int(currentBytesQueued.Gauge.GetValue()))
|
||||
}
|
||||
|
||||
// Reduce the max segment size so that a new segment is created & the next call to SendWrite causes the first
|
||||
// segment to be dropped and the queue size on disk to be lower than before when the queue head is advanced.
|
||||
require.NoError(t, rq.queue.SetMaxSegmentSize(8))
|
||||
|
||||
queueSizeBefore := rq.queue.DiskUsage()
|
||||
// Queue size should be 0 after SendWrite completes
|
||||
rq.SendWrite()
|
||||
|
||||
// Ensure that the smaller queue disk size was reflected in the metrics.
|
||||
currentBytesQueued := getPromMetric(t, "replications_queue_current_bytes_queued", reg)
|
||||
require.Less(t, int64(currentBytesQueued.Gauge.GetValue()), queueSizeBefore)
|
||||
require.Equal(t, float64(0), currentBytesQueued.Gauge.GetValue())
|
||||
}
|
||||
|
||||
func getPromMetric(t *testing.T, name string, reg *prom.Registry) *dto.Metric {
|
||||
|
|
|
@ -1,14 +1,19 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type ReplicationsMetrics struct {
|
||||
TotalPointsQueued *prometheus.CounterVec
|
||||
TotalBytesQueued *prometheus.CounterVec
|
||||
CurrentBytesQueued *prometheus.GaugeVec
|
||||
TotalPointsQueued *prometheus.CounterVec
|
||||
TotalBytesQueued *prometheus.CounterVec
|
||||
CurrentBytesQueued *prometheus.GaugeVec
|
||||
RemoteWriteErrors *prometheus.CounterVec
|
||||
RemoteWriteBytesSent *prometheus.CounterVec
|
||||
RemoteWriteBytesDropped *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func NewReplicationsMetrics() *ReplicationsMetrics {
|
||||
|
@ -34,6 +39,24 @@ func NewReplicationsMetrics() *ReplicationsMetrics {
|
|||
Name: "current_bytes_queued",
|
||||
Help: "Current number of bytes in the replication stream queue",
|
||||
}, []string{"replicationID"}),
|
||||
RemoteWriteErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "remote_write_errors",
|
||||
Help: "Error codes returned from attempted remote writes",
|
||||
}, []string{"replicationID", "code"}),
|
||||
RemoteWriteBytesSent: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "remote_write_bytes_sent",
|
||||
Help: "Bytes of data successfully sent by the replication stream",
|
||||
}, []string{"replicationID"}),
|
||||
RemoteWriteBytesDropped: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "remote_write_bytes_dropped",
|
||||
Help: "Bytes of data dropped by the replication stream",
|
||||
}, []string{"replicationID"}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -43,17 +66,35 @@ func (rm *ReplicationsMetrics) PrometheusCollectors() []prometheus.Collector {
|
|||
rm.TotalPointsQueued,
|
||||
rm.TotalBytesQueued,
|
||||
rm.CurrentBytesQueued,
|
||||
rm.RemoteWriteErrors,
|
||||
rm.RemoteWriteBytesSent,
|
||||
rm.RemoteWriteBytesDropped,
|
||||
}
|
||||
}
|
||||
|
||||
// EnqueueData updates the metrics when adding new data to a replication queue.
|
||||
func (rm *ReplicationsMetrics) EnqueueData(replicationID platform.ID, numBytes, numPoints int, queueSizeOnDisk int64) {
|
||||
func (rm *ReplicationsMetrics) EnqueueData(replicationID platform.ID, numBytes, numPoints int, queueSize int64) {
|
||||
rm.TotalPointsQueued.WithLabelValues(replicationID.String()).Add(float64(numPoints))
|
||||
rm.TotalBytesQueued.WithLabelValues(replicationID.String()).Add(float64(numBytes))
|
||||
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSizeOnDisk))
|
||||
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSize))
|
||||
}
|
||||
|
||||
// Dequeue updates the metrics when data has been removed from the queue.
|
||||
func (rm *ReplicationsMetrics) Dequeue(replicationID platform.ID, queueSizeOnDisk int64) {
|
||||
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSizeOnDisk))
|
||||
func (rm *ReplicationsMetrics) Dequeue(replicationID platform.ID, queueSize int64) {
|
||||
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSize))
|
||||
}
|
||||
|
||||
// RemoteWriteError increments the error code counter for the replication.
|
||||
func (rm *ReplicationsMetrics) RemoteWriteError(replicationID platform.ID, errorCode int) {
|
||||
rm.RemoteWriteErrors.WithLabelValues(replicationID.String(), strconv.Itoa(errorCode)).Inc()
|
||||
}
|
||||
|
||||
// RemoteWriteSent increases the total count of bytes sent following a successful remote write
|
||||
func (rm *ReplicationsMetrics) RemoteWriteSent(replicationID platform.ID, bytes int) {
|
||||
rm.RemoteWriteBytesSent.WithLabelValues(replicationID.String()).Add(float64(bytes))
|
||||
}
|
||||
|
||||
// RemoteWriteDropped increases the total count of bytes dropped when data is dropped
|
||||
func (rm *ReplicationsMetrics) RemoteWriteDropped(replicationID platform.ID, bytes int) {
|
||||
rm.RemoteWriteBytesDropped.WithLabelValues(replicationID.String()).Add(float64(bytes))
|
||||
}
|
||||
|
|
|
@ -106,15 +106,21 @@ func (w *writer) Write(ctx context.Context, data []byte) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Update metrics and most recent error diagnostic information.
|
||||
if err := w.configStore.UpdateResponseInfo(ctx, w.replicationID, res.StatusCode, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
// Successful write
|
||||
w.metrics.RemoteWriteSent(w.replicationID, len(data))
|
||||
w.logger.Debug("remote write successful", zap.Int("attempt", attempts), zap.Int("bytes", len(data)))
|
||||
return nil
|
||||
}
|
||||
|
||||
w.metrics.RemoteWriteError(w.replicationID, res.StatusCode)
|
||||
w.logger.Debug("remote write error", zap.Int("attempt", attempts), zap.String("error message", "msg"), zap.Int("status code", res.StatusCode))
|
||||
|
||||
attempts++
|
||||
var waitTime time.Duration
|
||||
hasSetWaitTime := false
|
||||
|
@ -122,7 +128,8 @@ func (w *writer) Write(ctx context.Context, data []byte) error {
|
|||
switch res.StatusCode {
|
||||
case http.StatusBadRequest:
|
||||
if conf.DropNonRetryableData {
|
||||
w.logger.Debug(fmt.Sprintf("dropped %d bytes of data due to %d response from server", len(data), http.StatusBadRequest))
|
||||
w.logger.Debug("dropped data", zap.Int("bytes", len(data)))
|
||||
w.metrics.RemoteWriteDropped(w.replicationID, len(data))
|
||||
return nil
|
||||
}
|
||||
case http.StatusTooManyRequests:
|
||||
|
@ -136,6 +143,7 @@ func (w *writer) Write(ctx context.Context, data []byte) error {
|
|||
if !hasSetWaitTime {
|
||||
waitTime = w.backoff(attempts)
|
||||
}
|
||||
w.logger.Debug("waiting to retry", zap.Duration("wait time", waitTime))
|
||||
|
||||
select {
|
||||
case <-w.waitFunc(waitTime):
|
||||
|
|
|
@ -14,6 +14,8 @@ import (
|
|||
"github.com/golang/mock/gomock"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"github.com/influxdata/influxdb/v2/kit/prom"
|
||||
"github.com/influxdata/influxdb/v2/kit/prom/promtest"
|
||||
"github.com/influxdata/influxdb/v2/replications/metrics"
|
||||
replicationsMock "github.com/influxdata/influxdb/v2/replications/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -244,6 +246,102 @@ func TestWrite(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestWrite_Metrics(t *testing.T) {
|
||||
maximumAttemptsBeforeErr := 5
|
||||
testData := []byte("this is some data")
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
status int
|
||||
data []byte
|
||||
wantWriteErr error
|
||||
registerExpectations func(*testing.T, *replicationsMock.MockHttpConfigStore, *influxdb.ReplicationHTTPConfig)
|
||||
checkMetrics func(*testing.T, *prom.Registry)
|
||||
}{
|
||||
{
|
||||
name: "server errors",
|
||||
status: http.StatusTeapot,
|
||||
data: []byte{},
|
||||
wantWriteErr: errors.New("maximum number of attempts exceeded"),
|
||||
registerExpectations: func(t *testing.T, store *replicationsMock.MockHttpConfigStore, conf *influxdb.ReplicationHTTPConfig) {
|
||||
store.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(conf, nil).Times(5)
|
||||
store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusTeapot, invalidResponseCode(http.StatusTeapot).Error()).Return(nil).Times(5)
|
||||
},
|
||||
checkMetrics: func(t *testing.T, reg *prom.Registry) {
|
||||
mfs := promtest.MustGather(t, reg)
|
||||
|
||||
errorCodes := promtest.FindMetric(mfs, "replications_queue_remote_write_errors", map[string]string{
|
||||
"replicationID": testID.String(),
|
||||
"code": strconv.Itoa(http.StatusTeapot),
|
||||
})
|
||||
require.NotNil(t, errorCodes)
|
||||
require.Equal(t, float64(maximumAttemptsBeforeErr), errorCodes.Counter.GetValue())
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "successful write",
|
||||
status: http.StatusNoContent,
|
||||
data: testData,
|
||||
wantWriteErr: nil,
|
||||
registerExpectations: func(t *testing.T, store *replicationsMock.MockHttpConfigStore, conf *influxdb.ReplicationHTTPConfig) {
|
||||
store.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(conf, nil)
|
||||
store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusNoContent, "").Return(nil)
|
||||
},
|
||||
checkMetrics: func(t *testing.T, reg *prom.Registry) {
|
||||
mfs := promtest.MustGather(t, reg)
|
||||
|
||||
bytesSent := promtest.FindMetric(mfs, "replications_queue_remote_write_bytes_sent", map[string]string{
|
||||
"replicationID": testID.String(),
|
||||
})
|
||||
require.NotNil(t, bytesSent)
|
||||
require.Equal(t, float64(len(testData)), bytesSent.Counter.GetValue())
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "dropped data",
|
||||
status: http.StatusBadRequest,
|
||||
data: testData,
|
||||
wantWriteErr: nil,
|
||||
registerExpectations: func(t *testing.T, store *replicationsMock.MockHttpConfigStore, conf *influxdb.ReplicationHTTPConfig) {
|
||||
store.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(conf, nil)
|
||||
store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusBadRequest, invalidResponseCode(http.StatusBadRequest).Error()).Return(nil)
|
||||
},
|
||||
checkMetrics: func(t *testing.T, reg *prom.Registry) {
|
||||
mfs := promtest.MustGather(t, reg)
|
||||
|
||||
bytesDropped := promtest.FindMetric(mfs, "replications_queue_remote_write_bytes_dropped", map[string]string{
|
||||
"replicationID": testID.String(),
|
||||
})
|
||||
require.NotNil(t, bytesDropped)
|
||||
require.Equal(t, float64(len(testData)), bytesDropped.Counter.GetValue())
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
svr := testServer(t, tt.status, tt.data)
|
||||
defer svr.Close()
|
||||
|
||||
testConfig := &influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: svr.URL,
|
||||
DropNonRetryableData: true,
|
||||
}
|
||||
|
||||
w, configStore := testWriter(t)
|
||||
reg := prom.NewRegistry(zaptest.NewLogger(t))
|
||||
reg.MustRegister(w.metrics.PrometheusCollectors()...)
|
||||
|
||||
w.waitFunc = instaWait()
|
||||
w.maximumAttemptsBeforeErr = maximumAttemptsBeforeErr
|
||||
|
||||
tt.registerExpectations(t, configStore, testConfig)
|
||||
require.Equal(t, tt.wantWriteErr, w.Write(context.Background(), tt.data))
|
||||
tt.checkMetrics(t, reg)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPostWrite(t *testing.T) {
|
||||
testData := []byte("some data")
|
||||
|
||||
|
|
Loading…
Reference in New Issue