diff --git a/replications/internal/queue_management.go b/replications/internal/queue_management.go index 5c84617ea0..fe179cf894 100644 --- a/replications/internal/queue_management.go +++ b/replications/internal/queue_management.go @@ -178,7 +178,7 @@ func (rq *replicationQueue) SendWrite() bool { // Update metrics after the call to scan.Advance() defer func() { - rq.metrics.Dequeue(rq.id, rq.queue.DiskUsage()) + rq.metrics.Dequeue(rq.id, rq.queue.TotalBytes()) }() if _, err = scan.Advance(); err != nil { @@ -361,7 +361,7 @@ func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byt return err } // Update metrics for this replication queue when adding data to the queue. - qm.metrics.EnqueueData(replicationID, len(data), numPoints, rq.queue.DiskUsage()) + qm.metrics.EnqueueData(replicationID, len(data), numPoints, rq.queue.TotalBytes()) qm.replicationQueues[replicationID].receive <- struct{}{} diff --git a/replications/internal/queue_management_test.go b/replications/internal/queue_management_test.go index 722524f9d4..4d400b9287 100644 --- a/replications/internal/queue_management_test.go +++ b/replications/internal/queue_management_test.go @@ -402,13 +402,14 @@ func TestEnqueueData_WithMetrics(t *testing.T) { reg := prom.NewRegistry(zaptest.NewLogger(t)) reg.MustRegister(qm.metrics.PrometheusCollectors()...) - data := []byte("some fake data") + data := "some fake data" numPointsPerData := 3 numDataToAdd := 4 + rq.remoteWriter = getTestRemoteWriter(t, data, nil) for i := 1; i <= numDataToAdd; i++ { go func() { <-rq.receive }() // absorb the receive to avoid testcase deadlock - require.NoError(t, qm.EnqueueData(id1, data, numPointsPerData)) + require.NoError(t, qm.EnqueueData(id1, []byte(data), numPointsPerData)) pointCount := getPromMetric(t, "replications_queue_total_points_queued", reg) require.Equal(t, i*numPointsPerData, int(pointCount.Counter.GetValue())) @@ -417,20 +418,14 @@ func TestEnqueueData_WithMetrics(t *testing.T) { require.Equal(t, i*len(data), int(totalBytesQueued.Counter.GetValue())) currentBytesQueued := getPromMetric(t, "replications_queue_current_bytes_queued", reg) - // 8 bytes for an empty queue; 8 extra bytes for each byte slice appended to the queue - require.Equal(t, 8+i*(8+len(data)), int(currentBytesQueued.Gauge.GetValue())) + // 8 extra bytes for each byte slice appended to the queue + require.Equal(t, i*(8+len(data)), int(currentBytesQueued.Gauge.GetValue())) } - // Reduce the max segment size so that a new segment is created & the next call to SendWrite causes the first - // segment to be dropped and the queue size on disk to be lower than before when the queue head is advanced. - require.NoError(t, rq.queue.SetMaxSegmentSize(8)) - - queueSizeBefore := rq.queue.DiskUsage() + // Queue size should be 0 after SendWrite completes rq.SendWrite() - - // Ensure that the smaller queue disk size was reflected in the metrics. currentBytesQueued := getPromMetric(t, "replications_queue_current_bytes_queued", reg) - require.Less(t, int64(currentBytesQueued.Gauge.GetValue()), queueSizeBefore) + require.Equal(t, float64(0), currentBytesQueued.Gauge.GetValue()) } func getPromMetric(t *testing.T, name string, reg *prom.Registry) *dto.Metric { diff --git a/replications/metrics/replications_metrics.go b/replications/metrics/replications_metrics.go index f83eb49151..ce435fff8a 100644 --- a/replications/metrics/replications_metrics.go +++ b/replications/metrics/replications_metrics.go @@ -1,14 +1,19 @@ package metrics import ( + "strconv" + "github.com/influxdata/influxdb/v2/kit/platform" "github.com/prometheus/client_golang/prometheus" ) type ReplicationsMetrics struct { - TotalPointsQueued *prometheus.CounterVec - TotalBytesQueued *prometheus.CounterVec - CurrentBytesQueued *prometheus.GaugeVec + TotalPointsQueued *prometheus.CounterVec + TotalBytesQueued *prometheus.CounterVec + CurrentBytesQueued *prometheus.GaugeVec + RemoteWriteErrors *prometheus.CounterVec + RemoteWriteBytesSent *prometheus.CounterVec + RemoteWriteBytesDropped *prometheus.CounterVec } func NewReplicationsMetrics() *ReplicationsMetrics { @@ -34,6 +39,24 @@ func NewReplicationsMetrics() *ReplicationsMetrics { Name: "current_bytes_queued", Help: "Current number of bytes in the replication stream queue", }, []string{"replicationID"}), + RemoteWriteErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "remote_write_errors", + Help: "Error codes returned from attempted remote writes", + }, []string{"replicationID", "code"}), + RemoteWriteBytesSent: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "remote_write_bytes_sent", + Help: "Bytes of data successfully sent by the replication stream", + }, []string{"replicationID"}), + RemoteWriteBytesDropped: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "remote_write_bytes_dropped", + Help: "Bytes of data dropped by the replication stream", + }, []string{"replicationID"}), } } @@ -43,17 +66,35 @@ func (rm *ReplicationsMetrics) PrometheusCollectors() []prometheus.Collector { rm.TotalPointsQueued, rm.TotalBytesQueued, rm.CurrentBytesQueued, + rm.RemoteWriteErrors, + rm.RemoteWriteBytesSent, + rm.RemoteWriteBytesDropped, } } // EnqueueData updates the metrics when adding new data to a replication queue. -func (rm *ReplicationsMetrics) EnqueueData(replicationID platform.ID, numBytes, numPoints int, queueSizeOnDisk int64) { +func (rm *ReplicationsMetrics) EnqueueData(replicationID platform.ID, numBytes, numPoints int, queueSize int64) { rm.TotalPointsQueued.WithLabelValues(replicationID.String()).Add(float64(numPoints)) rm.TotalBytesQueued.WithLabelValues(replicationID.String()).Add(float64(numBytes)) - rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSizeOnDisk)) + rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSize)) } // Dequeue updates the metrics when data has been removed from the queue. -func (rm *ReplicationsMetrics) Dequeue(replicationID platform.ID, queueSizeOnDisk int64) { - rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSizeOnDisk)) +func (rm *ReplicationsMetrics) Dequeue(replicationID platform.ID, queueSize int64) { + rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSize)) +} + +// RemoteWriteError increments the error code counter for the replication. +func (rm *ReplicationsMetrics) RemoteWriteError(replicationID platform.ID, errorCode int) { + rm.RemoteWriteErrors.WithLabelValues(replicationID.String(), strconv.Itoa(errorCode)).Inc() +} + +// RemoteWriteSent increases the total count of bytes sent following a successful remote write +func (rm *ReplicationsMetrics) RemoteWriteSent(replicationID platform.ID, bytes int) { + rm.RemoteWriteBytesSent.WithLabelValues(replicationID.String()).Add(float64(bytes)) +} + +// RemoteWriteDropped increases the total count of bytes dropped when data is dropped +func (rm *ReplicationsMetrics) RemoteWriteDropped(replicationID platform.ID, bytes int) { + rm.RemoteWriteBytesDropped.WithLabelValues(replicationID.String()).Add(float64(bytes)) } diff --git a/replications/remotewrite/writer.go b/replications/remotewrite/writer.go index 84b686dbdc..7e0f32534b 100644 --- a/replications/remotewrite/writer.go +++ b/replications/remotewrite/writer.go @@ -106,15 +106,21 @@ func (w *writer) Write(ctx context.Context, data []byte) error { return err } + // Update metrics and most recent error diagnostic information. if err := w.configStore.UpdateResponseInfo(ctx, w.replicationID, res.StatusCode, msg); err != nil { return err } if err == nil { // Successful write + w.metrics.RemoteWriteSent(w.replicationID, len(data)) + w.logger.Debug("remote write successful", zap.Int("attempt", attempts), zap.Int("bytes", len(data))) return nil } + w.metrics.RemoteWriteError(w.replicationID, res.StatusCode) + w.logger.Debug("remote write error", zap.Int("attempt", attempts), zap.String("error message", "msg"), zap.Int("status code", res.StatusCode)) + attempts++ var waitTime time.Duration hasSetWaitTime := false @@ -122,7 +128,8 @@ func (w *writer) Write(ctx context.Context, data []byte) error { switch res.StatusCode { case http.StatusBadRequest: if conf.DropNonRetryableData { - w.logger.Debug(fmt.Sprintf("dropped %d bytes of data due to %d response from server", len(data), http.StatusBadRequest)) + w.logger.Debug("dropped data", zap.Int("bytes", len(data))) + w.metrics.RemoteWriteDropped(w.replicationID, len(data)) return nil } case http.StatusTooManyRequests: @@ -136,6 +143,7 @@ func (w *writer) Write(ctx context.Context, data []byte) error { if !hasSetWaitTime { waitTime = w.backoff(attempts) } + w.logger.Debug("waiting to retry", zap.Duration("wait time", waitTime)) select { case <-w.waitFunc(waitTime): diff --git a/replications/remotewrite/writer_test.go b/replications/remotewrite/writer_test.go index 9796be905a..9ac5dd3e4d 100644 --- a/replications/remotewrite/writer_test.go +++ b/replications/remotewrite/writer_test.go @@ -14,6 +14,8 @@ import ( "github.com/golang/mock/gomock" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/influxdata/influxdb/v2/kit/prom" + "github.com/influxdata/influxdb/v2/kit/prom/promtest" "github.com/influxdata/influxdb/v2/replications/metrics" replicationsMock "github.com/influxdata/influxdb/v2/replications/mock" "github.com/stretchr/testify/require" @@ -244,6 +246,102 @@ func TestWrite(t *testing.T) { }) } +func TestWrite_Metrics(t *testing.T) { + maximumAttemptsBeforeErr := 5 + testData := []byte("this is some data") + + tests := []struct { + name string + status int + data []byte + wantWriteErr error + registerExpectations func(*testing.T, *replicationsMock.MockHttpConfigStore, *influxdb.ReplicationHTTPConfig) + checkMetrics func(*testing.T, *prom.Registry) + }{ + { + name: "server errors", + status: http.StatusTeapot, + data: []byte{}, + wantWriteErr: errors.New("maximum number of attempts exceeded"), + registerExpectations: func(t *testing.T, store *replicationsMock.MockHttpConfigStore, conf *influxdb.ReplicationHTTPConfig) { + store.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(conf, nil).Times(5) + store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusTeapot, invalidResponseCode(http.StatusTeapot).Error()).Return(nil).Times(5) + }, + checkMetrics: func(t *testing.T, reg *prom.Registry) { + mfs := promtest.MustGather(t, reg) + + errorCodes := promtest.FindMetric(mfs, "replications_queue_remote_write_errors", map[string]string{ + "replicationID": testID.String(), + "code": strconv.Itoa(http.StatusTeapot), + }) + require.NotNil(t, errorCodes) + require.Equal(t, float64(maximumAttemptsBeforeErr), errorCodes.Counter.GetValue()) + }, + }, + { + name: "successful write", + status: http.StatusNoContent, + data: testData, + wantWriteErr: nil, + registerExpectations: func(t *testing.T, store *replicationsMock.MockHttpConfigStore, conf *influxdb.ReplicationHTTPConfig) { + store.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(conf, nil) + store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusNoContent, "").Return(nil) + }, + checkMetrics: func(t *testing.T, reg *prom.Registry) { + mfs := promtest.MustGather(t, reg) + + bytesSent := promtest.FindMetric(mfs, "replications_queue_remote_write_bytes_sent", map[string]string{ + "replicationID": testID.String(), + }) + require.NotNil(t, bytesSent) + require.Equal(t, float64(len(testData)), bytesSent.Counter.GetValue()) + }, + }, + { + name: "dropped data", + status: http.StatusBadRequest, + data: testData, + wantWriteErr: nil, + registerExpectations: func(t *testing.T, store *replicationsMock.MockHttpConfigStore, conf *influxdb.ReplicationHTTPConfig) { + store.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(conf, nil) + store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusBadRequest, invalidResponseCode(http.StatusBadRequest).Error()).Return(nil) + }, + checkMetrics: func(t *testing.T, reg *prom.Registry) { + mfs := promtest.MustGather(t, reg) + + bytesDropped := promtest.FindMetric(mfs, "replications_queue_remote_write_bytes_dropped", map[string]string{ + "replicationID": testID.String(), + }) + require.NotNil(t, bytesDropped) + require.Equal(t, float64(len(testData)), bytesDropped.Counter.GetValue()) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + svr := testServer(t, tt.status, tt.data) + defer svr.Close() + + testConfig := &influxdb.ReplicationHTTPConfig{ + RemoteURL: svr.URL, + DropNonRetryableData: true, + } + + w, configStore := testWriter(t) + reg := prom.NewRegistry(zaptest.NewLogger(t)) + reg.MustRegister(w.metrics.PrometheusCollectors()...) + + w.waitFunc = instaWait() + w.maximumAttemptsBeforeErr = maximumAttemptsBeforeErr + + tt.registerExpectations(t, configStore, testConfig) + require.Equal(t, tt.wantWriteErr, w.Write(context.Background(), tt.data)) + tt.checkMetrics(t, reg) + }) + } +} + func TestPostWrite(t *testing.T) { testData := []byte("some data")