From 93c8a2a104d27935d91bc310c8a2e5d27f2f481b Mon Sep 17 00:00:00 2001 From: Johnny Steenbergen Date: Wed, 15 Jan 2020 11:00:31 -0800 Subject: [PATCH] feat(pkger): add service logging and tracing middlewares --- cmd/influxd/launcher/launcher.go | 2 + pkger/service.go | 3 + pkger/service_logging.go | 99 ++++++++++++++++++++++++++++++++ pkger/service_tracing.go | 41 +++++++++++++ 4 files changed, 145 insertions(+) create mode 100644 pkger/service_logging.go create mode 100644 pkger/service_tracing.go diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index befb9c0032..184779b9d9 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -827,6 +827,8 @@ func (m *Launcher) run(ctx context.Context) (err error) { pkger.WithTelegrafSVC(authorizer.NewTelegrafConfigService(b.TelegrafService, b.UserResourceMappingService)), pkger.WithVariableSVC(authorizer.NewVariableService(b.VariableService)), ) + pkgSVC = pkger.MWTracing()(pkgSVC) + pkgSVC = pkger.MWLogging(pkgerLogger)(pkgSVC) } var pkgHTTPServer *http.HandlerPkg diff --git a/pkger/service.go b/pkger/service.go index 4d8ae632ab..c73b1cbbbf 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -24,6 +24,9 @@ type SVC interface { Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, error) } +// SVCMiddleware is a service middleware func. +type SVCMiddleware func(SVC) SVC + type serviceOpt struct { logger *zap.Logger diff --git a/pkger/service_logging.go b/pkger/service_logging.go new file mode 100644 index 0000000000..a3c91c2904 --- /dev/null +++ b/pkger/service_logging.go @@ -0,0 +1,99 @@ +package pkger + +import ( + "context" + "time" + + "github.com/influxdata/influxdb" + "go.uber.org/zap" +) + +type loggingMW struct { + logger *zap.Logger + next SVC +} + +// MWLogging adds logging functionality for the service. +func MWLogging(log *zap.Logger) SVCMiddleware { + return func(svc SVC) SVC { + return &loggingMW{ + logger: log, + next: svc, + } + } +} + +var _ SVC = (*loggingMW)(nil) + +func (s *loggingMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (pkg *Pkg, err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + s.logger.Error("failed to create pkg", zap.Error(err), dur) + return + } + s.logger.Info("pkg create", append(s.summaryLogFields(pkg.Summary()), dur)...) + }(time.Now()) + return s.next.CreatePkg(ctx, setters...) +} + +func (s *loggingMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg) (sum Summary, diff Diff, err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + s.logger.Error("failed to dry run pkg", + zap.String("orgID", orgID.String()), + zap.String("userID", userID.String()), + zap.Error(err), + dur, + ) + return + } + s.logger.Info("pkg dry run successful", append(s.summaryLogFields(sum), dur)...) + }(time.Now()) + return s.next.DryRun(ctx, orgID, userID, pkg) +} + +func (s *loggingMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (sum Summary, err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + s.logger.Error("failed to apply pkg", + zap.String("orgID", orgID.String()), + zap.String("userID", userID.String()), + zap.Error(err), + dur, + ) + } + s.logger.Info("pkg apply successful", append(s.summaryLogFields(sum), dur)...) + }(time.Now()) + return s.next.Apply(ctx, orgID, userID, pkg, opts...) +} + +func (s *loggingMW) summaryLogFields(sum Summary) []zap.Field { + potentialFields := []struct { + key string + val int + }{ + {key: "buckets", val: len(sum.Buckets)}, + {key: "checks", val: len(sum.Checks)}, + {key: "dashboards", val: len(sum.Dashboards)}, + {key: "endpoints", val: len(sum.NotificationEndpoints)}, + {key: "labels", val: len(sum.Labels)}, + {key: "label_mappings", val: len(sum.LabelMappings)}, + {key: "rules", val: len(sum.NotificationRules)}, + {key: "secrets", val: len(sum.MissingSecrets)}, + {key: "tasks", val: len(sum.Tasks)}, + {key: "telegrafs", val: len(sum.TelegrafConfigs)}, + {key: "variables", val: len(sum.Variables)}, + } + + var fields []zap.Field + for _, f := range potentialFields { + if f.val > 0 { + fields = append(fields, zap.Int("num_"+f.key, f.val)) + } + } + + return fields +} diff --git a/pkger/service_tracing.go b/pkger/service_tracing.go new file mode 100644 index 0000000000..8036cae536 --- /dev/null +++ b/pkger/service_tracing.go @@ -0,0 +1,41 @@ +package pkger + +import ( + "context" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/kit/tracing" +) + +type traceMW struct { + next SVC +} + +// MWTracing adds tracing functionality for the service. +func MWTracing() SVCMiddleware { + return func(svc SVC) SVC { + return &traceMW{next: svc} + } +} + +var _ SVC = (*traceMW)(nil) + +func (s *traceMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (pkg *Pkg, err error) { + span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "CreatePkg") + defer span.Finish() + return s.next.CreatePkg(ctx, setters...) +} + +func (s *traceMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg) (sum Summary, diff Diff, err error) { + span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "DryRun") + span.LogKV("orgID", orgID.String(), "userID", userID.String()) + defer span.Finish() + return s.next.DryRun(ctx, orgID, userID, pkg) +} + +func (s *traceMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (sum Summary, err error) { + span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "Apply") + span.LogKV("orgID", orgID.String(), "userID", userID.String()) + defer span.Finish() + return s.next.Apply(ctx, orgID, userID, pkg, opts...) +}