feat(annotations): add logging and metrics to annotations service (#22271)
parent
aee9f817d8
commit
467040a072
|
@ -0,0 +1,168 @@
|
|||
package annotations
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func NewLoggingService(logger *zap.Logger, underlying influxdb.AnnotationService) *loggingService {
|
||||
return &loggingService{
|
||||
logger: logger,
|
||||
underlying: underlying,
|
||||
}
|
||||
}
|
||||
|
||||
type loggingService struct {
|
||||
logger *zap.Logger
|
||||
underlying influxdb.AnnotationService
|
||||
}
|
||||
|
||||
var _ influxdb.AnnotationService = (*loggingService)(nil)
|
||||
|
||||
func (l loggingService) CreateAnnotations(ctx context.Context, orgID platform.ID, create []influxdb.AnnotationCreate) (an []influxdb.AnnotationEvent, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to create annotations", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("annotations create", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.CreateAnnotations(ctx, orgID, create)
|
||||
}
|
||||
|
||||
func (l loggingService) ListAnnotations(ctx context.Context, orgID platform.ID, filter influxdb.AnnotationListFilter) (an []influxdb.StoredAnnotation, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to find annotations", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("annotations find", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.ListAnnotations(ctx, orgID, filter)
|
||||
}
|
||||
|
||||
func (l loggingService) GetAnnotation(ctx context.Context, id platform.ID) (an *influxdb.StoredAnnotation, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to find annotation by ID", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("annotation find by ID", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.GetAnnotation(ctx, id)
|
||||
}
|
||||
|
||||
func (l loggingService) DeleteAnnotations(ctx context.Context, orgID platform.ID, delete influxdb.AnnotationDeleteFilter) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to delete annotations", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("annotations delete", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.DeleteAnnotations(ctx, orgID, delete)
|
||||
}
|
||||
|
||||
func (l loggingService) DeleteAnnotation(ctx context.Context, id platform.ID) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to delete annotation", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("annotation delete", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.DeleteAnnotation(ctx, id)
|
||||
}
|
||||
|
||||
func (l loggingService) UpdateAnnotation(ctx context.Context, id platform.ID, update influxdb.AnnotationCreate) (an *influxdb.AnnotationEvent, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to update annotation", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("annotation update", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.UpdateAnnotation(ctx, id, update)
|
||||
}
|
||||
|
||||
func (l loggingService) ListStreams(ctx context.Context, orgID platform.ID, filter influxdb.StreamListFilter) (stm []influxdb.StoredStream, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to find streams", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("streams find", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.ListStreams(ctx, orgID, filter)
|
||||
}
|
||||
|
||||
func (l loggingService) CreateOrUpdateStream(ctx context.Context, orgID platform.ID, stream influxdb.Stream) (stm *influxdb.ReadStream, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to create or update stream", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("stream create or update", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.CreateOrUpdateStream(ctx, orgID, stream)
|
||||
}
|
||||
|
||||
func (l loggingService) UpdateStream(ctx context.Context, id platform.ID, stream influxdb.Stream) (stm *influxdb.ReadStream, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to update stream", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("stream update", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.UpdateStream(ctx, id, stream)
|
||||
}
|
||||
|
||||
func (l loggingService) GetStream(ctx context.Context, id platform.ID) (stm *influxdb.StoredStream, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to find stream by ID", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("stream find by ID", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.GetStream(ctx, id)
|
||||
}
|
||||
|
||||
func (l loggingService) DeleteStreams(ctx context.Context, orgID platform.ID, delete influxdb.BasicStream) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to delete streams", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("streams delete", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.DeleteStreams(ctx, orgID, delete)
|
||||
}
|
||||
|
||||
func (l loggingService) DeleteStreamByID(ctx context.Context, id platform.ID) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
l.logger.Debug("failed to delete stream", zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("stream delete", dur)
|
||||
}(time.Now())
|
||||
return l.underlying.DeleteStreamByID(ctx, id)
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
package annotations
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/metric"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
func NewMetricCollectingService(reg prometheus.Registerer, underlying influxdb.AnnotationService, opts ...metric.ClientOptFn) *metricsService {
|
||||
o := metric.ApplyMetricOpts(opts...)
|
||||
return &metricsService{
|
||||
rec: metric.New(reg, o.ApplySuffix("annotation")),
|
||||
underlying: underlying,
|
||||
}
|
||||
}
|
||||
|
||||
type metricsService struct {
|
||||
// RED metrics
|
||||
rec *metric.REDClient
|
||||
underlying influxdb.AnnotationService
|
||||
}
|
||||
|
||||
var _ influxdb.AnnotationService = (*metricsService)(nil)
|
||||
|
||||
func (m metricsService) CreateAnnotations(ctx context.Context, orgID platform.ID, create []influxdb.AnnotationCreate) ([]influxdb.AnnotationEvent, error) {
|
||||
rec := m.rec.Record("create_annotation")
|
||||
ans, err := m.underlying.CreateAnnotations(ctx, orgID, create)
|
||||
return ans, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) ListAnnotations(ctx context.Context, orgID platform.ID, filter influxdb.AnnotationListFilter) ([]influxdb.StoredAnnotation, error) {
|
||||
rec := m.rec.Record("find_annotations")
|
||||
ans, err := m.underlying.ListAnnotations(ctx, orgID, filter)
|
||||
return ans, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) GetAnnotation(ctx context.Context, id platform.ID) (*influxdb.StoredAnnotation, error) {
|
||||
rec := m.rec.Record("find_annotation_by_id")
|
||||
an, err := m.underlying.GetAnnotation(ctx, id)
|
||||
return an, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) DeleteAnnotations(ctx context.Context, orgID platform.ID, delete influxdb.AnnotationDeleteFilter) error {
|
||||
rec := m.rec.Record("delete_annotations")
|
||||
return rec(m.underlying.DeleteAnnotations(ctx, orgID, delete))
|
||||
}
|
||||
|
||||
func (m metricsService) DeleteAnnotation(ctx context.Context, id platform.ID) error {
|
||||
rec := m.rec.Record("delete_annotation")
|
||||
return rec(m.underlying.DeleteAnnotation(ctx, id))
|
||||
}
|
||||
|
||||
func (m metricsService) UpdateAnnotation(ctx context.Context, id platform.ID, update influxdb.AnnotationCreate) (*influxdb.AnnotationEvent, error) {
|
||||
rec := m.rec.Record("update_annotation")
|
||||
an, err := m.underlying.UpdateAnnotation(ctx, id, update)
|
||||
return an, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) ListStreams(ctx context.Context, orgID platform.ID, filter influxdb.StreamListFilter) ([]influxdb.StoredStream, error) {
|
||||
rec := m.rec.Record("find_streams")
|
||||
stms, err := m.underlying.ListStreams(ctx, orgID, filter)
|
||||
return stms, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) CreateOrUpdateStream(ctx context.Context, orgID platform.ID, stream influxdb.Stream) (*influxdb.ReadStream, error) {
|
||||
rec := m.rec.Record("create_or_update_stream")
|
||||
stm, err := m.underlying.CreateOrUpdateStream(ctx, orgID, stream)
|
||||
return stm, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) GetStream(ctx context.Context, id platform.ID) (*influxdb.StoredStream, error) {
|
||||
rec := m.rec.Record("find_stream_by_id")
|
||||
stm, err := m.underlying.GetStream(ctx, id)
|
||||
return stm, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) UpdateStream(ctx context.Context, id platform.ID, stream influxdb.Stream) (*influxdb.ReadStream, error) {
|
||||
rec := m.rec.Record("update_stream")
|
||||
stm, err := m.underlying.UpdateStream(ctx, id, stream)
|
||||
return stm, rec(err)
|
||||
}
|
||||
|
||||
func (m metricsService) DeleteStreams(ctx context.Context, orgID platform.ID, delete influxdb.BasicStream) error {
|
||||
rec := m.rec.Record("delete_streams")
|
||||
return rec(m.underlying.DeleteStreams(ctx, orgID, delete))
|
||||
}
|
||||
|
||||
func (m metricsService) DeleteStreamByID(ctx context.Context, id platform.ID) error {
|
||||
rec := m.rec.Record("delete_stream")
|
||||
return rec(m.underlying.DeleteStreamByID(ctx, id))
|
||||
}
|
|
@ -13,7 +13,6 @@ import (
|
|||
ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors"
|
||||
"github.com/influxdata/influxdb/v2/snowflake"
|
||||
"github.com/influxdata/influxdb/v2/sqlite"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -31,14 +30,12 @@ var _ influxdb.AnnotationService = (*Service)(nil)
|
|||
|
||||
type Service struct {
|
||||
store *sqlite.SqlStore
|
||||
log *zap.Logger
|
||||
idGenerator platform.IDGenerator
|
||||
}
|
||||
|
||||
func NewService(logger *zap.Logger, store *sqlite.SqlStore) *Service {
|
||||
func NewService(store *sqlite.SqlStore) *Service {
|
||||
return &Service{
|
||||
store: store,
|
||||
log: logger,
|
||||
idGenerator: snowflake.NewIDGenerator(),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1031,7 +1031,7 @@ func newTestService(t *testing.T) (*Service, func(t *testing.T)) {
|
|||
err := sqliteMigrator.Up(ctx, migrations.All)
|
||||
require.NoError(t, err)
|
||||
|
||||
svc := NewService(zap.NewNop(), store)
|
||||
svc := NewService(store)
|
||||
|
||||
return svc, clean
|
||||
}
|
||||
|
|
|
@ -952,13 +952,15 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
|
|||
),
|
||||
)
|
||||
|
||||
annotationSvc := annotations.NewService(
|
||||
m.log.With(zap.String("service", "annotations")),
|
||||
m.sqlStore,
|
||||
)
|
||||
annotationSvc := annotations.NewService(m.sqlStore)
|
||||
annotationServer := annotationTransport.NewAnnotationHandler(
|
||||
m.log.With(zap.String("handler", "annotations")),
|
||||
authorizer.NewAnnotationService(annotationSvc),
|
||||
authorizer.NewAnnotationService(
|
||||
annotations.NewLoggingService(
|
||||
m.log.With(zap.String("service", "annotations")),
|
||||
annotations.NewMetricCollectingService(m.reg, annotationSvc),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
remotesSvc := remotes.NewService(m.sqlStore)
|
||||
|
|
Loading…
Reference in New Issue