diff --git a/logger/fields.go b/logger/fields.go index 9ce7e2fa10..05750ce077 100644 --- a/logger/fields.go +++ b/logger/fields.go @@ -1,9 +1,12 @@ package logger import ( + "context" "time" "github.com/influxdata/influxdb/pkg/snowflake" + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -84,13 +87,24 @@ func Shard(id uint64) zapcore.Field { return zap.Uint64(DBShardIDKey, id) } +// TraceID returns a field "trace_id", value pulled from the (Jaeger) trace ID found in the given context. +// Returns zap.Skip() if the context doesn't have a trace ID. +func TraceID(ctx context.Context) zap.Field { + if span := opentracing.SpanFromContext(ctx); span != nil { + if spanContext, ok := span.Context().(jaeger.SpanContext); ok { + return zap.String("trace_id", spanContext.TraceID().String()) + } + } + return zap.Skip() +} + // NewOperation uses the exiting log to create a new logger with context // containing a trace id and the operation. Prior to returning, a standardized message // is logged indicating the operation has started. The returned function should be // called when the operation concludes in order to log a corresponding message which // includes an elapsed time and that the operation has ended. -func NewOperation(log *zap.Logger, msg, name string, fields ...zapcore.Field) (*zap.Logger, func()) { - f := []zapcore.Field{OperationName(name)} +func NewOperation(ctx context.Context, log *zap.Logger, msg, name string, fields ...zapcore.Field) (*zap.Logger, func()) { + f := []zapcore.Field{OperationName(name), TraceID(ctx)} if len(fields) > 0 { f = append(f, fields...) } diff --git a/storage/retention.go b/storage/retention.go index 4460a46906..fd04e025ea 100644 --- a/storage/retention.go +++ b/storage/retention.go @@ -7,6 +7,7 @@ import ( "time" "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/logger" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -86,15 +87,18 @@ func (s *retentionEnforcer) run() { return // Not initialized } - log, logEnd := logger.NewOperation(s.logger, "Data retention check", "data_retention_check") + span, ctx := tracing.StartSpanFromContext(context.Background()) + defer span.Finish() + + log, logEnd := logger.NewOperation(ctx, s.logger, "Data retention check", "data_retention_check") defer logEnd() now := time.Now().UTC() - buckets, err := s.getBucketInformation() + buckets, err := s.getBucketInformation(ctx) if err != nil { log.Error("Unable to determine bucket information", zap.Error(err)) } else { - s.expireData(buckets, now) + s.expireData(ctx, buckets, now) } s.tracker.CheckDuration(time.Since(now), err == nil) } @@ -103,8 +107,8 @@ func (s *retentionEnforcer) run() { // // Any series data that (1) belongs to a bucket in the provided list and // (2) falls outside the bucket's indicated retention period will be deleted. -func (s *retentionEnforcer) expireData(buckets []*influxdb.Bucket, now time.Time) { - logger, logEnd := logger.NewOperation(s.logger, "Data deletion", "data_deletion") +func (s *retentionEnforcer) expireData(ctx context.Context, buckets []*influxdb.Bucket, now time.Time) { + logger, logEnd := logger.NewOperation(ctx, s.logger, "Data deletion", "data_deletion") defer logEnd() for _, b := range buckets { @@ -112,6 +116,13 @@ func (s *retentionEnforcer) expireData(buckets []*influxdb.Bucket, now time.Time continue } + span, _ := tracing.StartSpanFromContext(ctx) + span.LogKV( + "bucket", b.Name, + "org_id", b.OrgID, + "retention_period", b.RetentionPeriod, + "retention_policy", b.RetentionPolicyName) + max := now.Add(-b.RetentionPeriod).UnixNano() err := s.Engine.DeleteBucketRange(b.OrgID, b.ID, math.MinInt64, max) if err != nil { @@ -119,14 +130,17 @@ func (s *retentionEnforcer) expireData(buckets []*influxdb.Bucket, now time.Time zap.String("bucket id", b.ID.String()), zap.String("org id", b.OrgID.String()), zap.Error(err)) + tracing.LogError(span, err) } s.tracker.IncChecks(b.OrgID, b.ID, err == nil) + + span.Finish() } } // getBucketInformation returns a slice of buckets to run retention on. -func (s *retentionEnforcer) getBucketInformation() ([]*influxdb.Bucket, error) { - ctx, cancel := context.WithTimeout(context.Background(), bucketAPITimeout) +func (s *retentionEnforcer) getBucketInformation(ctx context.Context) ([]*influxdb.Bucket, error) { + ctx, cancel := context.WithTimeout(ctx, bucketAPITimeout) defer cancel() buckets, _, err := s.BucketService.FindBuckets(ctx, influxdb.BucketFilter{}) diff --git a/storage/retention_test.go b/storage/retention_test.go index 8d6c744ecb..34cf9729c3 100644 --- a/storage/retention_test.go +++ b/storage/retention_test.go @@ -20,8 +20,8 @@ func TestRetentionService(t *testing.T) { now := time.Date(2018, 4, 10, 23, 12, 33, 0, time.UTC) t.Run("no buckets", func(t *testing.T) { - service.expireData(nil, now) - service.expireData([]*influxdb.Bucket{}, now) + service.expireData(context.Background(), nil, now) + service.expireData(context.Background(), []*influxdb.Bucket{}, now) }) // Generate some buckets to expire @@ -75,7 +75,7 @@ func TestRetentionService(t *testing.T) { } t.Run("multiple buckets", func(t *testing.T) { - service.expireData(buckets, now) + service.expireData(context.Background(), buckets, now) if !reflect.DeepEqual(gotMatched, expMatched) { t.Fatalf("got\n%#v\nexpected\n%#v", gotMatched, expMatched) } diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index f3455804fd..b6ec01a267 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/flux/lang" "github.com/influxdata/influxdb" icontext "github.com/influxdata/influxdb/context" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/task/backend" @@ -90,7 +91,7 @@ var _ backend.RunPromise = (*syncRunPromise)(nil) func newSyncRunPromise(ctx context.Context, auth *influxdb.Authorization, qr backend.QueuedRun, e *queryServiceExecutor, t *influxdb.Task) *syncRunPromise { ctx, cancel := context.WithCancel(ctx) opLogger := e.logger.With(zap.Stringer("task_id", qr.TaskID), zap.Stringer("run_id", qr.RunID)) - log, logEnd := logger.NewOperation(opLogger, "Executing task", "execute") + log, logEnd := logger.NewOperation(ctx, opLogger, "Executing task", "execute") rp := &syncRunPromise{ qr: qr, auth: auth, @@ -252,7 +253,7 @@ func (e *asyncQueryServiceExecutor) Execute(ctx context.Context, run backend.Que return nil, err } - return newAsyncRunPromise(run, q, e), nil + return newAsyncRunPromise(ctx, run, q, e), nil } func (e *asyncQueryServiceExecutor) Wait() { @@ -275,9 +276,12 @@ type asyncRunPromise struct { var _ backend.RunPromise = (*asyncRunPromise)(nil) -func newAsyncRunPromise(qr backend.QueuedRun, q flux.Query, e *asyncQueryServiceExecutor) *asyncRunPromise { +func newAsyncRunPromise(ctx context.Context, qr backend.QueuedRun, q flux.Query, e *asyncQueryServiceExecutor) *asyncRunPromise { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + opLogger := e.logger.With(zap.Stringer("task_id", qr.TaskID), zap.Stringer("run_id", qr.RunID)) - log, logEnd := logger.NewOperation(opLogger, "Executing task", "execute") + log, logEnd := logger.NewOperation(ctx, opLogger, "Executing task", "execute") p := &asyncRunPromise{ qr: qr, diff --git a/tsdb/series_file.go b/tsdb/series_file.go index b775905f00..6a1352e21e 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -93,10 +93,10 @@ func (f *SeriesFile) Open(ctx context.Context) error { return errors.New("series file already opened") } - span, _ := tracing.StartSpanFromContext(ctx) + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() - _, logEnd := logger.NewOperation(f.Logger, "Opening Series File", "series_file_open", zap.String("path", f.path)) + _, logEnd := logger.NewOperation(ctx, f.Logger, "Opening Series File", "series_file_open", zap.String("path", f.path)) defer logEnd() // Create path if it doesn't exist. diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index b4d04bea00..d0af754fa3 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -1,6 +1,7 @@ package tsdb import ( + "context" "encoding/binary" "errors" "fmt" @@ -10,6 +11,7 @@ import ( "sync" "time" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/rhh" @@ -191,6 +193,9 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(collection *SeriesCollecti return ErrSeriesPartitionClosed } + span, ctx := tracing.StartSpanFromContext(context.TODO()) + defer span.Finish() + writeRequired := 0 for iter := collection.Iterator(); iter.Next(); { index := iter.Index() @@ -302,7 +307,7 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(collection *SeriesCollecti // Check if we've crossed the compaction threshold. if p.compactionsEnabled() && !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) { p.compacting = true - log, logEnd := logger.NewOperation(p.Logger, "Series partition compaction", "series_partition_compaction", zap.String("path", p.path)) + log, logEnd := logger.NewOperation(ctx, p.Logger, "Series partition compaction", "series_partition_compaction", zap.String("path", p.path)) p.wg.Add(1) p.tracker.IncCompactionsActive() diff --git a/tsdb/tsi1/partition.go b/tsdb/tsi1/partition.go index 865e4c8b91..01abfb29b5 100644 --- a/tsdb/tsi1/partition.go +++ b/tsdb/tsi1/partition.go @@ -1,6 +1,7 @@ package tsi1 import ( + "context" "encoding/json" "errors" "fmt" @@ -14,6 +15,7 @@ import ( "time" "unsafe" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/pkg/bytesutil" "github.com/influxdata/influxdb/pkg/lifecycle" @@ -1007,8 +1009,11 @@ func (p *Partition) compactToLevel(files []*IndexFile, frefs lifecycle.Reference p.tracker.CompactionAttempted(level, success, time.Since(start)) }() + span, ctx := tracing.StartSpanFromContext(context.Background()) + defer span.Finish() + // Build a logger for this compaction. - log, logEnd := logger.NewOperation(p.logger, "TSI level compaction", "tsi1_compact_to_level", zap.Int("tsi1_level", level)) + log, logEnd := logger.NewOperation(ctx, p.logger, "TSI level compaction", "tsi1_compact_to_level", zap.Int("tsi1_level", level)) defer logEnd() // Check for cancellation. @@ -1144,6 +1149,9 @@ func (p *Partition) checkLogFile() error { return nil } + span, ctx := tracing.StartSpanFromContext(context.Background()) + defer span.Finish() + // Swap current log file. logFile := p.activeLogFile @@ -1156,7 +1164,7 @@ func (p *Partition) checkLogFile() error { // Begin compacting in a background goroutine. p.compactionsWG.Add(1) go func() { - p.compactLogFile(logFile, ref.Closing()) + p.compactLogFile(ctx, logFile, ref.Closing()) ref.Release() // release our reference p.compactionsWG.Done() // compaction is now complete p.Compact() // check for new compactions @@ -1168,7 +1176,7 @@ func (p *Partition) checkLogFile() error { // compactLogFile compacts f into a tsi file. The new file will share the // same identifier but will have a ".tsi" extension. Once the log file is // compacted then the manifest is updated and the log file is discarded. -func (p *Partition) compactLogFile(logFile *LogFile, interrupt <-chan struct{}) { +func (p *Partition) compactLogFile(ctx context.Context, logFile *LogFile, interrupt <-chan struct{}) { defer func() { p.mu.RLock() defer p.mu.RUnlock() @@ -1184,7 +1192,7 @@ func (p *Partition) compactLogFile(logFile *LogFile, interrupt <-chan struct{}) assert(id != 0, "cannot parse log file id: %s", logFile.Path()) // Build a logger for this compaction. - log, logEnd := logger.NewOperation(p.logger, "TSI log compaction", "tsi1_compact_log_file", zap.Int("tsi1_log_file_id", id)) + log, logEnd := logger.NewOperation(ctx, p.logger, "TSI log compaction", "tsi1_compact_log_file", zap.Int("tsi1_log_file_id", id)) defer logEnd() // Create new index file. diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index 263207bef2..ac646e61d5 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -779,7 +779,7 @@ func (e *Engine) WriteSnapshot(ctx context.Context) error { started := time.Now() - log, logEnd := logger.NewOperation(e.logger, "Cache snapshot", "tsm1_cache_snapshot") + log, logEnd := logger.NewOperation(ctx, e.logger, "Cache snapshot", "tsm1_cache_snapshot") defer func() { elapsed := time.Since(started) log.Info("Snapshot for path written", @@ -1115,12 +1115,12 @@ func (s *compactionStrategy) Apply(ctx context.Context) { // compactGroup executes the compaction strategy against a single CompactionGroup. func (s *compactionStrategy) compactGroup(ctx context.Context) { - span, _ := tracing.StartSpanFromContext(ctx) + span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() now := time.Now() group := s.group - log, logEnd := logger.NewOperation(s.logger, "TSM compaction", "tsm1_compact_group") + log, logEnd := logger.NewOperation(ctx, s.logger, "TSM compaction", "tsm1_compact_group") defer logEnd() log.Info("Beginning compaction", zap.Int("tsm1_files_n", len(group)))