feat: add logging and metrics middlewares to replications API (#22291)
parent
11c97470a5
commit
b37ad79e20
|
@ -977,7 +977,10 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
|
|||
replicationSvc := replications.NewService()
|
||||
replicationServer := replicationTransport.NewReplicationHandler(
|
||||
m.log.With(zap.String("handler", "replications")),
|
||||
replicationSvc,
|
||||
replications.NewLoggingService(
|
||||
m.log.With(zap.String("service", "replications")),
|
||||
replications.NewMetricCollectingService(m.reg, replicationSvc),
|
||||
),
|
||||
)
|
||||
|
||||
platformHandler := http.NewPlatformHandler(
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
package replications
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func NewLoggingService(logger *zap.Logger, underlying influxdb.ReplicationService) *loggingService {
|
||||
return &loggingService{
|
||||
logger: logger,
|
||||
underlying: underlying,
|
||||
}
|
||||
}
|
||||
|
||||
type loggingService struct {
|
||||
logger *zap.Logger
|
||||
underlying influxdb.ReplicationService
|
||||
}
|
||||
|
||||
var _ influxdb.ReplicationService = (*loggingService)(nil)
|
||||
|
||||
func (l loggingService) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (rs *influxdb.Replications, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to find replications", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("replications find", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.ListReplications(ctx, filter)
|
||||
}
|
||||
|
||||
func (l loggingService) CreateReplication(ctx context.Context, request influxdb.CreateReplicationRequest) (r *influxdb.Replication, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to create replication", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("replication create", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.CreateReplication(ctx, request)
|
||||
}
|
||||
|
||||
func (l loggingService) ValidateNewReplication(ctx context.Context, request influxdb.CreateReplicationRequest) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to validate replication create", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("replication validate create", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.ValidateNewReplication(ctx, request)
|
||||
}
|
||||
|
||||
func (l loggingService) GetReplication(ctx context.Context, id platform.ID) (r *influxdb.Replication, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to find replication by ID", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("replication find by ID", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.GetReplication(ctx, id)
|
||||
}
|
||||
|
||||
func (l loggingService) UpdateReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) (r *influxdb.Replication, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to update replication", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("replication update", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.UpdateReplication(ctx, id, request)
|
||||
}
|
||||
|
||||
func (l loggingService) ValidateUpdatedReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to validate replication update", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("replication validate update", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.ValidateUpdatedReplication(ctx, id, request)
|
||||
}
|
||||
|
||||
func (l loggingService) DeleteReplication(ctx context.Context, id platform.ID) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to delete replication", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("replication delete", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.DeleteReplication(ctx, id)
|
||||
}
|
||||
|
||||
func (l loggingService) ValidateReplication(ctx context.Context, id platform.ID) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to validate replication", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("replication validate", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.ValidateReplication(ctx, id)
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
package replications
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/metric"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
func NewMetricCollectingService(reg prometheus.Registerer, underlying influxdb.ReplicationService, opts ...metric.ClientOptFn) *metricsService {
|
||||
o := metric.ApplyMetricOpts(opts...)
|
||||
return &metricsService{
|
||||
rec: metric.New(reg, o.ApplySuffix("replication")),
|
||||
underlying: underlying,
|
||||
}
|
||||
}
|
||||
|
||||
type metricsService struct {
|
||||
rec *metric.REDClient
|
||||
underlying influxdb.ReplicationService
|
||||
}
|
||||
|
||||
var _ influxdb.ReplicationService = (*metricsService)(nil)
|
||||
|
||||
func (m metricsService) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) {
|
||||
rec := m.rec.Record("find_replications")
|
||||
rcs, err := m.underlying.ListReplications(ctx, filter)
|
||||
return rcs, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) CreateReplication(ctx context.Context, request influxdb.CreateReplicationRequest) (*influxdb.Replication, error) {
|
||||
rec := m.rec.Record("create_replication")
|
||||
r, err := m.underlying.CreateReplication(ctx, request)
|
||||
return r, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) ValidateNewReplication(ctx context.Context, request influxdb.CreateReplicationRequest) error {
|
||||
rec := m.rec.Record("validate_create_replication")
|
||||
return rec(m.underlying.ValidateNewReplication(ctx, request))
|
||||
}
|
||||
|
||||
func (m metricsService) GetReplication(ctx context.Context, id platform.ID) (*influxdb.Replication, error) {
|
||||
rec := m.rec.Record("find_replication_by_id")
|
||||
r, err := m.underlying.GetReplication(ctx, id)
|
||||
return r, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) UpdateReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) (*influxdb.Replication, error) {
|
||||
rec := m.rec.Record("update_replication")
|
||||
r, err := m.underlying.UpdateReplication(ctx, id, request)
|
||||
return r, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) ValidateUpdatedReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) error {
|
||||
rec := m.rec.Record("validate_update_replication")
|
||||
return rec(m.underlying.ValidateUpdatedReplication(ctx, id, request))
|
||||
}
|
||||
|
||||
func (m metricsService) DeleteReplication(ctx context.Context, id platform.ID) error {
|
||||
rec := m.rec.Record("delete_replication")
|
||||
return rec(m.underlying.DeleteReplication(ctx, id))
|
||||
}
|
||||
|
||||
func (m metricsService) ValidateReplication(ctx context.Context, id platform.ID) error {
|
||||
rec := m.rec.Record("validate_replication")
|
||||
return rec(m.underlying.ValidateReplication(ctx, id))
|
||||
}
|
Loading…
Reference in New Issue