From 467040a072511911bee79af9c861feaa6e0377be Mon Sep 17 00:00:00 2001 From: William Baker Date: Fri, 20 Aug 2021 14:28:37 -0600 Subject: [PATCH] feat(annotations): add logging and metrics to annotations service (#22271) --- annotations/middleware_logging.go | 168 ++++++++++++++++++++++++++++++ annotations/middleware_metrics.go | 94 +++++++++++++++++ annotations/service.go | 5 +- annotations/service_test.go | 2 +- cmd/influxd/launcher/launcher.go | 12 ++- 5 files changed, 271 insertions(+), 10 deletions(-) create mode 100644 annotations/middleware_logging.go create mode 100644 annotations/middleware_metrics.go diff --git a/annotations/middleware_logging.go b/annotations/middleware_logging.go new file mode 100644 index 0000000000..b1471d7d7a --- /dev/null +++ b/annotations/middleware_logging.go @@ -0,0 +1,168 @@ +package annotations + +import ( + "context" + "time" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/platform" + "go.uber.org/zap" +) + +func NewLoggingService(logger *zap.Logger, underlying influxdb.AnnotationService) *loggingService { + return &loggingService{ + logger: logger, + underlying: underlying, + } +} + +type loggingService struct { + logger *zap.Logger + underlying influxdb.AnnotationService +} + +var _ influxdb.AnnotationService = (*loggingService)(nil) + +func (l loggingService) CreateAnnotations(ctx context.Context, orgID platform.ID, create []influxdb.AnnotationCreate) (an []influxdb.AnnotationEvent, err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + l.logger.Debug("failed to create annotations", zap.Error(err), dur) + return + } + l.logger.Debug("annotations create", dur) + }(time.Now()) + return l.underlying.CreateAnnotations(ctx, orgID, create) +} + +func (l loggingService) ListAnnotations(ctx context.Context, orgID platform.ID, filter influxdb.AnnotationListFilter) (an []influxdb.StoredAnnotation, err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + l.logger.Debug("failed to find annotations", zap.Error(err), dur) + return + } + l.logger.Debug("annotations find", dur) + }(time.Now()) + return l.underlying.ListAnnotations(ctx, orgID, filter) +} + +func (l loggingService) GetAnnotation(ctx context.Context, id platform.ID) (an *influxdb.StoredAnnotation, err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + l.logger.Debug("failed to find annotation by ID", zap.Error(err), dur) + return + } + l.logger.Debug("annotation find by ID", dur) + }(time.Now()) + return l.underlying.GetAnnotation(ctx, id) +} + +func (l loggingService) DeleteAnnotations(ctx context.Context, orgID platform.ID, delete influxdb.AnnotationDeleteFilter) (err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + l.logger.Debug("failed to delete annotations", zap.Error(err), dur) + return + } + l.logger.Debug("annotations delete", dur) + }(time.Now()) + return l.underlying.DeleteAnnotations(ctx, orgID, delete) +} + +func (l loggingService) DeleteAnnotation(ctx context.Context, id platform.ID) (err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + l.logger.Debug("failed to delete annotation", zap.Error(err), dur) + return + } + l.logger.Debug("annotation delete", dur) + }(time.Now()) + return l.underlying.DeleteAnnotation(ctx, id) +} + +func (l loggingService) UpdateAnnotation(ctx context.Context, id platform.ID, update influxdb.AnnotationCreate) (an *influxdb.AnnotationEvent, err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + l.logger.Debug("failed to update annotation", zap.Error(err), dur) + return + } + l.logger.Debug("annotation update", dur) + }(time.Now()) + return l.underlying.UpdateAnnotation(ctx, id, update) +} + +func (l loggingService) ListStreams(ctx context.Context, orgID platform.ID, filter influxdb.StreamListFilter) (stm []influxdb.StoredStream, err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + l.logger.Debug("failed to find streams", zap.Error(err), dur) + return + } + l.logger.Debug("streams find", dur) + }(time.Now()) + return l.underlying.ListStreams(ctx, orgID, filter) +} + +func (l loggingService) CreateOrUpdateStream(ctx context.Context, orgID platform.ID, stream influxdb.Stream) (stm *influxdb.ReadStream, err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + l.logger.Debug("failed to create or update stream", zap.Error(err), dur) + return + } + l.logger.Debug("stream create or update", dur) + }(time.Now()) + return l.underlying.CreateOrUpdateStream(ctx, orgID, stream) +} + +func (l loggingService) UpdateStream(ctx context.Context, id platform.ID, stream influxdb.Stream) (stm *influxdb.ReadStream, err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + l.logger.Debug("failed to update stream", zap.Error(err), dur) + return + } + l.logger.Debug("stream update", dur) + }(time.Now()) + return l.underlying.UpdateStream(ctx, id, stream) +} + +func (l loggingService) GetStream(ctx context.Context, id platform.ID) (stm *influxdb.StoredStream, err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + l.logger.Debug("failed to find stream by ID", zap.Error(err), dur) + return + } + l.logger.Debug("stream find by ID", dur) + }(time.Now()) + return l.underlying.GetStream(ctx, id) +} + +func (l loggingService) DeleteStreams(ctx context.Context, orgID platform.ID, delete influxdb.BasicStream) (err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + l.logger.Debug("failed to delete streams", zap.Error(err), dur) + return + } + l.logger.Debug("streams delete", dur) + }(time.Now()) + return l.underlying.DeleteStreams(ctx, orgID, delete) +} + +func (l loggingService) DeleteStreamByID(ctx context.Context, id platform.ID) (err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + l.logger.Debug("failed to delete stream", zap.Error(err), dur) + return + } + l.logger.Debug("stream delete", dur) + }(time.Now()) + return l.underlying.DeleteStreamByID(ctx, id) +} diff --git a/annotations/middleware_metrics.go b/annotations/middleware_metrics.go new file mode 100644 index 0000000000..d760934877 --- /dev/null +++ b/annotations/middleware_metrics.go @@ -0,0 +1,94 @@ +package annotations + +import ( + "context" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/metric" + "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/prometheus/client_golang/prometheus" +) + +func NewMetricCollectingService(reg prometheus.Registerer, underlying influxdb.AnnotationService, opts ...metric.ClientOptFn) *metricsService { + o := metric.ApplyMetricOpts(opts...) + return &metricsService{ + rec: metric.New(reg, o.ApplySuffix("annotation")), + underlying: underlying, + } +} + +type metricsService struct { + // RED metrics + rec *metric.REDClient + underlying influxdb.AnnotationService +} + +var _ influxdb.AnnotationService = (*metricsService)(nil) + +func (m metricsService) CreateAnnotations(ctx context.Context, orgID platform.ID, create []influxdb.AnnotationCreate) ([]influxdb.AnnotationEvent, error) { + rec := m.rec.Record("create_annotation") + ans, err := m.underlying.CreateAnnotations(ctx, orgID, create) + return ans, rec(err) +} + +func (m metricsService) ListAnnotations(ctx context.Context, orgID platform.ID, filter influxdb.AnnotationListFilter) ([]influxdb.StoredAnnotation, error) { + rec := m.rec.Record("find_annotations") + ans, err := m.underlying.ListAnnotations(ctx, orgID, filter) + return ans, rec(err) +} + +func (m metricsService) GetAnnotation(ctx context.Context, id platform.ID) (*influxdb.StoredAnnotation, error) { + rec := m.rec.Record("find_annotation_by_id") + an, err := m.underlying.GetAnnotation(ctx, id) + return an, rec(err) +} + +func (m metricsService) DeleteAnnotations(ctx context.Context, orgID platform.ID, delete influxdb.AnnotationDeleteFilter) error { + rec := m.rec.Record("delete_annotations") + return rec(m.underlying.DeleteAnnotations(ctx, orgID, delete)) +} + +func (m metricsService) DeleteAnnotation(ctx context.Context, id platform.ID) error { + rec := m.rec.Record("delete_annotation") + return rec(m.underlying.DeleteAnnotation(ctx, id)) +} + +func (m metricsService) UpdateAnnotation(ctx context.Context, id platform.ID, update influxdb.AnnotationCreate) (*influxdb.AnnotationEvent, error) { + rec := m.rec.Record("update_annotation") + an, err := m.underlying.UpdateAnnotation(ctx, id, update) + return an, rec(err) +} + +func (m metricsService) ListStreams(ctx context.Context, orgID platform.ID, filter influxdb.StreamListFilter) ([]influxdb.StoredStream, error) { + rec := m.rec.Record("find_streams") + stms, err := m.underlying.ListStreams(ctx, orgID, filter) + return stms, rec(err) +} + +func (m metricsService) CreateOrUpdateStream(ctx context.Context, orgID platform.ID, stream influxdb.Stream) (*influxdb.ReadStream, error) { + rec := m.rec.Record("create_or_update_stream") + stm, err := m.underlying.CreateOrUpdateStream(ctx, orgID, stream) + return stm, rec(err) +} + +func (m metricsService) GetStream(ctx context.Context, id platform.ID) (*influxdb.StoredStream, error) { + rec := m.rec.Record("find_stream_by_id") + stm, err := m.underlying.GetStream(ctx, id) + return stm, rec(err) +} + +func (m metricsService) UpdateStream(ctx context.Context, id platform.ID, stream influxdb.Stream) (*influxdb.ReadStream, error) { + rec := m.rec.Record("update_stream") + stm, err := m.underlying.UpdateStream(ctx, id, stream) + return stm, rec(err) +} + +func (m metricsService) DeleteStreams(ctx context.Context, orgID platform.ID, delete influxdb.BasicStream) error { + rec := m.rec.Record("delete_streams") + return rec(m.underlying.DeleteStreams(ctx, orgID, delete)) +} + +func (m metricsService) DeleteStreamByID(ctx context.Context, id platform.ID) error { + rec := m.rec.Record("delete_stream") + return rec(m.underlying.DeleteStreamByID(ctx, id)) +} diff --git a/annotations/service.go b/annotations/service.go index 25c7f95718..7e607b8f2e 100644 --- a/annotations/service.go +++ b/annotations/service.go @@ -13,7 +13,6 @@ import ( ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors" "github.com/influxdata/influxdb/v2/snowflake" "github.com/influxdata/influxdb/v2/sqlite" - "go.uber.org/zap" ) var ( @@ -31,14 +30,12 @@ var _ influxdb.AnnotationService = (*Service)(nil) type Service struct { store *sqlite.SqlStore - log *zap.Logger idGenerator platform.IDGenerator } -func NewService(logger *zap.Logger, store *sqlite.SqlStore) *Service { +func NewService(store *sqlite.SqlStore) *Service { return &Service{ store: store, - log: logger, idGenerator: snowflake.NewIDGenerator(), } } diff --git a/annotations/service_test.go b/annotations/service_test.go index 169c68aded..9ec0fbc9b4 100644 --- a/annotations/service_test.go +++ b/annotations/service_test.go @@ -1031,7 +1031,7 @@ func newTestService(t *testing.T) (*Service, func(t *testing.T)) { err := sqliteMigrator.Up(ctx, migrations.All) require.NoError(t, err) - svc := NewService(zap.NewNop(), store) + svc := NewService(store) return svc, clean } diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index aa209a6c1d..d7f70dd968 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -952,13 +952,15 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { ), ) - annotationSvc := annotations.NewService( - m.log.With(zap.String("service", "annotations")), - m.sqlStore, - ) + annotationSvc := annotations.NewService(m.sqlStore) annotationServer := annotationTransport.NewAnnotationHandler( m.log.With(zap.String("handler", "annotations")), - authorizer.NewAnnotationService(annotationSvc), + authorizer.NewAnnotationService( + annotations.NewLoggingService( + m.log.With(zap.String("service", "annotations")), + annotations.NewMetricCollectingService(m.reg, annotationSvc), + ), + ), ) remotesSvc := remotes.NewService(m.sqlStore)