feat: add logging and metrics middlewares to remotes API (#22258)
parent
585ff40968
commit
69988ae68d
|
@ -959,13 +959,13 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
|
|||
authorizer.NewAnnotationService(annotationSvc),
|
||||
)
|
||||
|
||||
remotesSvc := remotes.NewService(
|
||||
m.log.With(zap.String("service", "remotes")),
|
||||
m.sqlStore,
|
||||
)
|
||||
remotesSvc := remotes.NewService(m.sqlStore)
|
||||
remotesServer := remotesTransport.NewRemoteConnectionHandler(
|
||||
m.log.With(zap.String("handler", "remotes")),
|
||||
remotesSvc,
|
||||
remotes.NewLoggingService(
|
||||
m.log.With(zap.String("service", "remotes")),
|
||||
remotes.NewMetricCollectingService(m.reg, remotesSvc),
|
||||
),
|
||||
)
|
||||
|
||||
platformHandler := http.NewPlatformHandler(
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
package remotes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func NewLoggingService(logger *zap.Logger, underlying influxdb.RemoteConnectionService) *loggingService {
|
||||
return &loggingService{
|
||||
logger: logger,
|
||||
underlying: underlying,
|
||||
}
|
||||
}
|
||||
|
||||
type loggingService struct {
|
||||
logger *zap.Logger
|
||||
underlying influxdb.RemoteConnectionService
|
||||
}
|
||||
|
||||
var _ influxdb.RemoteConnectionService = (*loggingService)(nil)
|
||||
|
||||
func (l loggingService) ListRemoteConnections(ctx context.Context, filter influxdb.RemoteConnectionListFilter) (cs *influxdb.RemoteConnections, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to find remotes", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("remotes find", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.ListRemoteConnections(ctx, filter)
|
||||
}
|
||||
|
||||
func (l loggingService) CreateRemoteConnection(ctx context.Context, request influxdb.CreateRemoteConnectionRequest) (r *influxdb.RemoteConnection, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to create remote", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("remote create", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.CreateRemoteConnection(ctx, request)
|
||||
}
|
||||
|
||||
func (l loggingService) ValidateNewRemoteConnection(ctx context.Context, request influxdb.CreateRemoteConnectionRequest) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to validate remote create", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("remote validate create", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.ValidateNewRemoteConnection(ctx, request)
|
||||
}
|
||||
|
||||
func (l loggingService) GetRemoteConnection(ctx context.Context, id platform.ID) (r *influxdb.RemoteConnection, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to find remote by ID", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("remote find by ID", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.GetRemoteConnection(ctx, id)
|
||||
}
|
||||
|
||||
func (l loggingService) UpdateRemoteConnection(ctx context.Context, id platform.ID, request influxdb.UpdateRemoteConnectionRequest) (r *influxdb.RemoteConnection, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to update remote", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("remote update", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.UpdateRemoteConnection(ctx, id, request)
|
||||
}
|
||||
|
||||
func (l loggingService) ValidateUpdatedRemoteConnection(ctx context.Context, id platform.ID, request influxdb.UpdateRemoteConnectionRequest) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to validate remote update", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("remote validate update", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.ValidateUpdatedRemoteConnection(ctx, id, request)
|
||||
}
|
||||
|
||||
func (l loggingService) DeleteRemoteConnection(ctx context.Context, id platform.ID) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to delete remote", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("remote delete", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.DeleteRemoteConnection(ctx, id)
|
||||
}
|
||||
|
||||
func (l loggingService) ValidateRemoteConnection(ctx context.Context, id platform.ID) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to validate remote", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("remote validate", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.ValidateRemoteConnection(ctx, id)
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
package remotes
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/metric"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
func NewMetricCollectingService(reg prometheus.Registerer, underlying influxdb.RemoteConnectionService, opts ...metric.ClientOptFn) *metricsService {
|
||||
o := metric.ApplyMetricOpts(opts...)
|
||||
return &metricsService{
|
||||
rec: metric.New(reg, o.ApplySuffix("remote")),
|
||||
underlying: underlying,
|
||||
}
|
||||
}
|
||||
|
||||
type metricsService struct {
|
||||
// RED metrics
|
||||
rec *metric.REDClient
|
||||
underlying influxdb.RemoteConnectionService
|
||||
}
|
||||
|
||||
var _ influxdb.RemoteConnectionService = (*metricsService)(nil)
|
||||
|
||||
func (m metricsService) ListRemoteConnections(ctx context.Context, filter influxdb.RemoteConnectionListFilter) (*influxdb.RemoteConnections, error) {
|
||||
rec := m.rec.Record("find_remotes")
|
||||
rcs, err := m.underlying.ListRemoteConnections(ctx, filter)
|
||||
return rcs, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) CreateRemoteConnection(ctx context.Context, request influxdb.CreateRemoteConnectionRequest) (*influxdb.RemoteConnection, error) {
|
||||
rec := m.rec.Record("create_remote")
|
||||
rc, err := m.underlying.CreateRemoteConnection(ctx, request)
|
||||
return rc, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) ValidateNewRemoteConnection(ctx context.Context, request influxdb.CreateRemoteConnectionRequest) error {
|
||||
rec := m.rec.Record("validate_create_remote")
|
||||
return rec(m.underlying.ValidateNewRemoteConnection(ctx, request))
|
||||
}
|
||||
|
||||
func (m metricsService) GetRemoteConnection(ctx context.Context, id platform.ID) (*influxdb.RemoteConnection, error) {
|
||||
rec := m.rec.Record("find_remote_by_id")
|
||||
rc, err := m.underlying.GetRemoteConnection(ctx, id)
|
||||
return rc, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) UpdateRemoteConnection(ctx context.Context, id platform.ID, request influxdb.UpdateRemoteConnectionRequest) (*influxdb.RemoteConnection, error) {
|
||||
rec := m.rec.Record("update_remote")
|
||||
rc, err := m.underlying.UpdateRemoteConnection(ctx, id, request)
|
||||
return rc, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) ValidateUpdatedRemoteConnection(ctx context.Context, id platform.ID, request influxdb.UpdateRemoteConnectionRequest) error {
|
||||
rec := m.rec.Record("validate_update_remote")
|
||||
return rec(m.underlying.ValidateUpdatedRemoteConnection(ctx, id, request))
|
||||
}
|
||||
|
||||
func (m metricsService) DeleteRemoteConnection(ctx context.Context, id platform.ID) error {
|
||||
rec := m.rec.Record("delete_remote")
|
||||
return rec(m.underlying.DeleteRemoteConnection(ctx, id))
|
||||
}
|
||||
|
||||
func (m metricsService) ValidateRemoteConnection(ctx context.Context, id platform.ID) error {
|
||||
rec := m.rec.Record("validate_remote")
|
||||
return rec(m.underlying.ValidateRemoteConnection(ctx, id))
|
||||
}
|
|
@ -11,7 +11,6 @@ import (
|
|||
ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors"
|
||||
"github.com/influxdata/influxdb/v2/snowflake"
|
||||
"github.com/influxdata/influxdb/v2/sqlite"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -29,10 +28,9 @@ type RemoteConnectionValidator interface {
|
|||
ValidateRemoteConnectionHTTPConfig(context.Context, *influxdb.RemoteConnectionHTTPConfig) error
|
||||
}
|
||||
|
||||
func NewService(logger *zap.Logger, store *sqlite.SqlStore) *service {
|
||||
func NewService(store *sqlite.SqlStore) *service {
|
||||
return &service{
|
||||
store: store,
|
||||
log: logger,
|
||||
idGenerator: snowflake.NewIDGenerator(),
|
||||
validator: &stubValidator{},
|
||||
}
|
||||
|
@ -40,7 +38,6 @@ func NewService(logger *zap.Logger, store *sqlite.SqlStore) *service {
|
|||
|
||||
type service struct {
|
||||
store *sqlite.SqlStore
|
||||
log *zap.Logger
|
||||
idGenerator platform.IDGenerator
|
||||
validator RemoteConnectionValidator
|
||||
}
|
||||
|
|
|
@ -313,7 +313,6 @@ func newTestService(t *testing.T) (*service, *mock.MockRemoteConnectionValidator
|
|||
mockValidator := mock.NewMockRemoteConnectionValidator(gomock.NewController(t))
|
||||
svc := service{
|
||||
store: store,
|
||||
log: logger,
|
||||
idGenerator: mock.NewIncrementingIDGenerator(platform.ID(1)),
|
||||
validator: mockValidator,
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue