feat: added metrics collection for replications (#22906)

* feat: added metrics collection for replications

* fix: fixed panic when restarting

* fix: fix panic pt2

* chore: self-review fixes

* chore: simplify test
pull/22912/head
William Baker 2021-11-22 10:40:03 -07:00 committed by GitHub
parent dece95d1dd
commit 3a81166812
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 187 additions and 48 deletions

View File

@ -352,13 +352,15 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
remotesServer := remotesTransport.NewInstrumentedRemotesHandler(
m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc)
replicationSvc := replications.NewService(m.sqlStore, ts, pointsWriter, m.log.With(zap.String("service", "replications")), opts.EnginePath)
replicationSvc, replicationsMetrics := replications.NewService(m.sqlStore, ts, pointsWriter, m.log.With(zap.String("service", "replications")), opts.EnginePath)
replicationServer := replicationTransport.NewInstrumentedReplicationHandler(
m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc)
ts.BucketService = replications.NewBucketService(
m.log.With(zap.String("service", "replication_buckets")), ts.BucketService, replicationSvc)
if feature.ReplicationStreamBackend().Enabled(ctx, m.flagger) {
m.reg.MustRegister(replicationsMetrics.PrometheusCollectors()...)
if err = replicationSvc.Open(ctx); err != nil {
m.log.Error("Failed to open replications service", zap.Error(err))
return err

View File

@ -10,16 +10,18 @@ import (
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/pkg/durablequeue"
"github.com/influxdata/influxdb/v2/replications/metrics"
"go.uber.org/zap"
)
type replicationQueue struct {
queue *durablequeue.Queue
wg sync.WaitGroup
done chan struct{}
receive chan struct{}
logger *zap.Logger
id platform.ID
queue *durablequeue.Queue
wg sync.WaitGroup
done chan struct{}
receive chan struct{}
logger *zap.Logger
metrics *metrics.ReplicationsMetrics
writeFunc func([]byte) error
}
@ -28,16 +30,16 @@ type durableQueueManager struct {
logger *zap.Logger
queuePath string
mutex sync.RWMutex
writeFunc func([]byte) error
metrics *metrics.ReplicationsMetrics
writeFunc func([]byte) error
}
var errStartup = errors.New("startup tasks for replications durable queue management failed, see server logs for details")
var errShutdown = errors.New("shutdown tasks for replications durable queues failed, see server logs for details")
// NewDurableQueueManager creates a new durableQueueManager struct, for managing durable queues associated with
//replication streams.
func NewDurableQueueManager(log *zap.Logger, queuePath string, writeFunc func([]byte) error) *durableQueueManager {
// replication streams.
func NewDurableQueueManager(log *zap.Logger, queuePath string, metrics *metrics.ReplicationsMetrics, writeFunc func([]byte) error) *durableQueueManager {
replicationQueues := make(map[platform.ID]*replicationQueue)
os.MkdirAll(queuePath, 0777)
@ -46,6 +48,7 @@ func NewDurableQueueManager(log *zap.Logger, queuePath string, writeFunc func([]
replicationQueues: replicationQueues,
logger: log,
queuePath: queuePath,
metrics: metrics,
writeFunc: writeFunc,
}
}
@ -91,14 +94,8 @@ func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQue
}
// Map new durable queue and scanner to its corresponding replication stream via replication ID
rq := replicationQueue{
queue: newQueue,
done: make(chan struct{}),
receive: make(chan struct{}),
logger: qm.logger.With(zap.String("replication_id", replicationID.String())),
writeFunc: qm.writeFunc,
}
qm.replicationQueues[replicationID] = &rq
rq := qm.newReplicationQueue(replicationID, newQueue)
qm.replicationQueues[replicationID] = rq
rq.Open()
qm.logger.Debug("Created new durable queue for replication stream",
@ -122,6 +119,7 @@ func (rq *replicationQueue) Close() error {
// WriteFunc is currently a placeholder for the "default" behavior
// of the queue scanner sending data from the durable queue to a remote host.
func WriteFunc(b []byte) error {
// TODO: Add metrics updates for BytesSent, BytesDropped, and ErrorCodes
return nil
}
@ -144,7 +142,6 @@ func (rq *replicationQueue) run() {
// Retryable errors should be handled and retried in the dp function.
// Unprocessable data should be dropped in the dp function.
func (rq *replicationQueue) SendWrite(dp func([]byte) error) bool {
// Any error in creating the scanner should exit the loop in run()
// Either it is io.EOF indicating no data, or some other failure in making
// the Scanner object that we don't know how to handle.
@ -157,7 +154,6 @@ func (rq *replicationQueue) SendWrite(dp func([]byte) error) bool {
}
for scan.Next() {
// An io.EOF error here indicates that there is no more data
// left to process, and is an expected error.
if scan.Err() == io.EOF {
@ -180,6 +176,11 @@ func (rq *replicationQueue) SendWrite(dp func([]byte) error) bool {
}
}
// Update metrics after the call to scan.Advance()
defer func() {
rq.metrics.Dequeue(rq.id, rq.queue.DiskUsage())
}()
if _, err = scan.Advance(); err != nil {
if err != io.EOF {
rq.logger.Error("Error in replication queue scanner", zap.Error(err))
@ -285,13 +286,7 @@ func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[pl
errOccurred = true
continue
} else {
qm.replicationQueues[id] = &replicationQueue{
queue: queue,
done: make(chan struct{}),
receive: make(chan struct{}),
logger: qm.logger.With(zap.String("replication_id", id.String())),
writeFunc: qm.writeFunc,
}
qm.replicationQueues[id] = qm.newReplicationQueue(id, queue)
qm.replicationQueues[id].Open()
qm.logger.Info("Opened replication stream", zap.String("id", id.String()), zap.String("path", queue.Dir()))
}
@ -353,18 +348,34 @@ func (qm *durableQueueManager) CloseAll() error {
}
// EnqueueData persists a set of bytes to a replication's durable queue.
func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byte) error {
func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byte, numPoints int) error {
qm.mutex.RLock()
defer qm.mutex.RUnlock()
if _, exist := qm.replicationQueues[replicationID]; !exist {
rq, ok := qm.replicationQueues[replicationID]
if !ok {
return fmt.Errorf("durable queue not found for replication ID %q", replicationID)
}
if err := qm.replicationQueues[replicationID].queue.Append(data); err != nil {
if err := rq.queue.Append(data); err != nil {
return err
}
// Update metrics for this replication queue when adding data to the queue.
qm.metrics.EnqueueData(replicationID, len(data), numPoints, rq.queue.DiskUsage())
qm.replicationQueues[replicationID].receive <- struct{}{}
return nil
}
func (qm *durableQueueManager) newReplicationQueue(id platform.ID, queue *durablequeue.Queue) *replicationQueue {
return &replicationQueue{
id: id,
queue: queue,
done: make(chan struct{}),
receive: make(chan struct{}),
logger: qm.logger.With(zap.String("replication_id", id.String())),
metrics: qm.metrics,
writeFunc: qm.writeFunc,
}
}

View File

@ -8,6 +8,10 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/prom"
"github.com/influxdata/influxdb/v2/kit/prom/promtest"
"github.com/influxdata/influxdb/v2/replications/metrics"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
@ -43,7 +47,7 @@ func TestEnqueueScan(t *testing.T) {
// Enqueue some data
testData := "weather,location=us-midwest temperature=82 1465839830100400200"
qm.writeFunc = getTestWriteFunc(t, testData)
err = qm.EnqueueData(id1, []byte(testData))
err = qm.EnqueueData(id1, []byte(testData), 1)
require.NoError(t, err)
}
@ -60,10 +64,10 @@ func TestEnqueueScanMultiple(t *testing.T) {
// Enqueue some data
testData := "weather,location=us-midwest temperature=82 1465839830100400200"
qm.writeFunc = getTestWriteFunc(t, testData)
err = qm.EnqueueData(id1, []byte(testData))
err = qm.EnqueueData(id1, []byte(testData), 1)
require.NoError(t, err)
err = qm.EnqueueData(id1, []byte(testData))
err = qm.EnqueueData(id1, []byte(testData), 1)
require.NoError(t, err)
}
@ -270,7 +274,7 @@ func initQueueManager(t *testing.T) (string, *durableQueueManager) {
queuePath := filepath.Join(enginePath, "replicationq")
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, queuePath, WriteFunc)
qm := NewDurableQueueManager(logger, queuePath, metrics.NewReplicationsMetrics(), WriteFunc)
return queuePath, qm
}
@ -303,7 +307,7 @@ func TestEnqueueData(t *testing.T) {
defer os.RemoveAll(queuePath)
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, queuePath, WriteFunc)
qm := NewDurableQueueManager(logger, queuePath, metrics.NewReplicationsMetrics(), WriteFunc)
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes))
require.DirExists(t, filepath.Join(queuePath, id1.String()))
@ -321,7 +325,7 @@ func TestEnqueueData(t *testing.T) {
close(rq.done)
go func() { <-rq.receive }() // absorb the receive to avoid testcase deadlock
require.NoError(t, qm.EnqueueData(id1, []byte(data)))
require.NoError(t, qm.EnqueueData(id1, []byte(data), 1))
sizes, err = qm.CurrentQueueSizes([]platform.ID{id1})
require.NoError(t, err)
require.Greater(t, sizes[id1], int64(8))
@ -332,6 +336,62 @@ func TestEnqueueData(t *testing.T) {
require.Equal(t, data, string(written))
}
func TestEnqueueData_WithMetrics(t *testing.T) {
t.Parallel()
path, qm := initQueueManager(t)
defer os.RemoveAll(path)
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes))
require.DirExists(t, filepath.Join(path, id1.String()))
// close the scanner goroutine to specifically test EnqueueData()
rq, ok := qm.replicationQueues[id1]
require.True(t, ok)
close(rq.done)
reg := prom.NewRegistry(zaptest.NewLogger(t))
reg.MustRegister(qm.metrics.PrometheusCollectors()...)
data := []byte("some fake data")
numPointsPerData := 3
numDataToAdd := 4
for i := 1; i <= numDataToAdd; i++ {
go func() { <-rq.receive }() // absorb the receive to avoid testcase deadlock
require.NoError(t, qm.EnqueueData(id1, data, numPointsPerData))
pointCount := getPromMetric(t, "replications_queue_total_points_queued", reg)
require.Equal(t, i*numPointsPerData, int(pointCount.Counter.GetValue()))
totalBytesQueued := getPromMetric(t, "replications_queue_total_bytes_queued", reg)
require.Equal(t, i*len(data), int(totalBytesQueued.Counter.GetValue()))
currentBytesQueued := getPromMetric(t, "replications_queue_current_bytes_queued", reg)
// 8 bytes for an empty queue; 8 extra bytes for each byte slice appended to the queue
require.Equal(t, 8+i*(8+len(data)), int(currentBytesQueued.Gauge.GetValue()))
}
// Reduce the max segment size so that a new segment is created & the next call to SendWrite causes the first
// segment to be dropped and the queue size on disk to be lower than before when the queue head is advanced.
require.NoError(t, rq.queue.SetMaxSegmentSize(8))
queueSizeBefore := rq.queue.DiskUsage()
rq.SendWrite(func(bytes []byte) error {
return nil
})
// Ensure that the smaller queue disk size was reflected in the metrics.
currentBytesQueued := getPromMetric(t, "replications_queue_current_bytes_queued", reg)
require.Less(t, int64(currentBytesQueued.Gauge.GetValue()), queueSizeBefore)
}
func getPromMetric(t *testing.T, name string, reg *prom.Registry) *dto.Metric {
mfs := promtest.MustGather(t, reg)
return promtest.FindMetric(mfs, name, map[string]string{
"replicationID": id1.String(),
})
}
func TestGoroutineReceives(t *testing.T) {
t.Parallel()
@ -345,7 +405,7 @@ func TestGoroutineReceives(t *testing.T) {
require.NotNil(t, rq)
close(rq.done) // atypical from normal behavior, but lets us receive channels to test
go func() { require.NoError(t, qm.EnqueueData(id1, []byte("1234"))) }()
go func() { require.NoError(t, qm.EnqueueData(id1, []byte("1234"), 1)) }()
select {
case <-rq.receive:
return

View File

@ -0,0 +1,59 @@
package metrics
import (
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/prometheus/client_golang/prometheus"
)
type ReplicationsMetrics struct {
TotalPointsQueued *prometheus.CounterVec
TotalBytesQueued *prometheus.CounterVec
CurrentBytesQueued *prometheus.GaugeVec
}
func NewReplicationsMetrics() *ReplicationsMetrics {
const namespace = "replications"
const subsystem = "queue"
return &ReplicationsMetrics{
TotalPointsQueued: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_points_queued",
Help: "Sum of all points that have been added to the replication stream queue",
}, []string{"replicationID"}),
TotalBytesQueued: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_bytes_queued",
Help: "Sum of all bytes that have been added to the replication stream queue",
}, []string{"replicationID"}),
CurrentBytesQueued: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "current_bytes_queued",
Help: "Current number of bytes in the replication stream queue",
}, []string{"replicationID"}),
}
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (rm *ReplicationsMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
rm.TotalPointsQueued,
rm.TotalBytesQueued,
rm.CurrentBytesQueued,
}
}
// EnqueueData updates the metrics when adding new data to a replication queue.
func (rm *ReplicationsMetrics) EnqueueData(replicationID platform.ID, numBytes, numPoints int, queueSizeOnDisk int64) {
rm.TotalPointsQueued.WithLabelValues(replicationID.String()).Add(float64(numPoints))
rm.TotalBytesQueued.WithLabelValues(replicationID.String()).Add(float64(numBytes))
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSizeOnDisk))
}
// Dequeue updates the metrics when data has been removed from the queue.
func (rm *ReplicationsMetrics) Dequeue(replicationID platform.ID, queueSizeOnDisk int64) {
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSizeOnDisk))
}

View File

@ -78,17 +78,17 @@ func (mr *MockDurableQueueManagerMockRecorder) DeleteQueue(arg0 interface{}) *go
}
// EnqueueData mocks base method.
func (m *MockDurableQueueManager) EnqueueData(arg0 platform.ID, arg1 []byte) error {
func (m *MockDurableQueueManager) EnqueueData(arg0 platform.ID, arg1 []byte, arg2 int) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "EnqueueData", arg0, arg1)
ret := m.ctrl.Call(m, "EnqueueData", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// EnqueueData indicates an expected call of EnqueueData.
func (mr *MockDurableQueueManagerMockRecorder) EnqueueData(arg0, arg1 interface{}) *gomock.Call {
func (mr *MockDurableQueueManagerMockRecorder) EnqueueData(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnqueueData", reflect.TypeOf((*MockDurableQueueManager)(nil).EnqueueData), arg0, arg1)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnqueueData", reflect.TypeOf((*MockDurableQueueManager)(nil).EnqueueData), arg0, arg1, arg2)
}
// InitializeQueue mocks base method.

View File

@ -16,6 +16,7 @@ import (
ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/replications/internal"
"github.com/influxdata/influxdb/v2/replications/metrics"
"github.com/influxdata/influxdb/v2/snowflake"
"github.com/influxdata/influxdb/v2/sqlite"
"github.com/influxdata/influxdb/v2/storage"
@ -45,7 +46,9 @@ func errLocalBucketNotFound(id platform.ID, cause error) error {
}
}
func NewService(store *sqlite.SqlStore, bktSvc BucketService, localWriter storage.PointsWriter, log *zap.Logger, enginePath string) *service {
func NewService(store *sqlite.SqlStore, bktSvc BucketService, localWriter storage.PointsWriter, log *zap.Logger, enginePath string) (*service, *metrics.ReplicationsMetrics) {
metrs := metrics.NewReplicationsMetrics()
return &service{
store: store,
idGenerator: snowflake.NewIDGenerator(),
@ -56,9 +59,10 @@ func NewService(store *sqlite.SqlStore, bktSvc BucketService, localWriter storag
durableQueueManager: internal.NewDurableQueueManager(
log,
filepath.Join(enginePath, "replicationq"),
metrs,
internal.WriteFunc,
),
}
}, metrs
}
type ReplicationValidator interface {
@ -78,7 +82,7 @@ type DurableQueueManager interface {
CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error)
StartReplicationQueues(trackedReplications map[platform.ID]int64) error
CloseAll() error
EnqueueData(replicationID platform.ID, data []byte) error
EnqueueData(replicationID platform.ID, data []byte, numPoints int) error
}
type service struct {
@ -459,9 +463,10 @@ func (s service) WritePoints(ctx context.Context, orgID platform.ID, bucketID pl
for _, id := range ids {
go func(id platform.ID) {
defer wg.Done()
if err := s.durableQueueManager.EnqueueData(id, buf.Bytes()); err != nil {
if err := s.durableQueueManager.EnqueueData(id, buf.Bytes(), len(points)); err != nil {
s.log.Error("Failed to enqueue points for replication", zap.String("id", id.String()), zap.Error(err))
}
}(id)
}
wg.Wait()

View File

@ -645,8 +645,10 @@ disk,host=C value=1.3 1000000000`)
// Points should successfully be enqueued in the 2 replications associated with the local bucket.
for _, id := range []platform.ID{initID, initID + 2} {
mocks.durableQueueManager.EXPECT().
EnqueueData(id, gomock.Any()).
DoAndReturn(func(_ platform.ID, data []byte) error {
EnqueueData(id, gomock.Any(), len(points)).
DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error {
require.Equal(t, len(points), numPoints)
gzBuf := bytes.NewBuffer(data)
gzr, err := gzip.NewReader(gzBuf)
require.NoError(t, err)