From e2f5b2bd9d9e152ade18fcf659ff7f8dad725d5e Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 19 Sep 2019 13:48:06 +0100 Subject: [PATCH] refactor(storage): add more context to traces and logs --- storage/retention.go | 46 +++++++++++++++++++++++-------- tsdb/tsm1/engine_delete_prefix.go | 11 ++++++-- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/storage/retention.go b/storage/retention.go index a8615f4f60..8dc2ce4b96 100644 --- a/storage/retention.go +++ b/storage/retention.go @@ -6,13 +6,13 @@ import ( "math" "time" - "github.com/influxdata/influxdb/tsdb/tsm1" - "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/logger" + "github.com/influxdata/influxdb/tsdb/tsm1" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) const ( @@ -118,7 +118,8 @@ func (s *retentionEnforcer) run() { // Any series data that (1) belongs to a bucket in the provided list and // (2) falls outside the bucket's indicated retention period will be deleted. func (s *retentionEnforcer) expireData(ctx context.Context, buckets []*influxdb.Bucket, now time.Time) { - logger, logEnd := logger.NewOperation(ctx, s.logger, "Data deletion", "data_deletion") + logger, logEnd := logger.NewOperation(ctx, s.logger, "Data deletion", "data_deletion", + zap.Int("buckets", len(buckets))) defer logEnd() // Snapshot to clear the cache to reduce write contention. @@ -126,31 +127,52 @@ func (s *retentionEnforcer) expireData(ctx context.Context, buckets []*influxdb. logger.Warn("Unable to snapshot cache before retention", zap.Error(err)) } + var skipInf, skipInvalid int for _, b := range buckets { + bucketFields := []zapcore.Field{ + zap.String("org_id", b.OrgID.String()), + zap.String("bucket_id", b.ID.String()), + zap.Duration("retention_period", b.RetentionPeriod), + zap.Bool("system", b.IsSystem()), + } + if b.RetentionPeriod == 0 { + logger.Debug("Skipping bucket with infinite retention", bucketFields...) + skipInf++ + continue + } else if !b.OrgID.Valid() || !b.ID.Valid() { + skipInvalid++ + logger.Warn("Skipping bucket with invalid fields", bucketFields...) continue } + min := int64(math.MinInt64) + max := now.Add(-b.RetentionPeriod).UnixNano() + span, ctx := tracing.StartSpanFromContext(ctx) span.LogKV( - "bucket", b.Name, + "bucket", b.ID, "org_id", b.OrgID, + "system", b.IsSystem(), "retention_period", b.RetentionPeriod, - "retention_policy", b.RetentionPolicyName) + "retention_policy", b.RetentionPolicyName, + "from", time.Unix(0, min).UTC(), + "to", time.Unix(0, max).UTC(), + ) - max := now.Add(-b.RetentionPeriod).UnixNano() - err := s.Engine.DeleteBucketRange(ctx, b.OrgID, b.ID, math.MinInt64, max) + err := s.Engine.DeleteBucketRange(ctx, b.OrgID, b.ID, min, max) if err != nil { - logger.Info("unable to delete bucket range", - zap.String("bucket id", b.ID.String()), - zap.String("org id", b.OrgID.String()), - zap.Error(err)) + logger.Info("Unable to delete bucket range", + append(bucketFields, zap.Time("min", time.Unix(0, min)), zap.Time("max", time.Unix(0, max)), zap.Error(err))...) tracing.LogError(span, err) } s.tracker.IncChecks(err == nil) - span.Finish() } + + if skipInf > 0 || skipInvalid > 0 { + logger.Info("Skipped buckets", zap.Int("infinite_retention_total", skipInf), zap.Int("invalid_total", skipInvalid)) + } } // getBucketInformation returns a slice of buckets to run retention on. diff --git a/tsdb/tsm1/engine_delete_prefix.go b/tsdb/tsm1/engine_delete_prefix.go index 4ea512d777..64fe90e052 100644 --- a/tsdb/tsm1/engine_delete_prefix.go +++ b/tsdb/tsm1/engine_delete_prefix.go @@ -3,9 +3,11 @@ package tsm1 import ( "bytes" "context" + "fmt" "math" "strings" "sync" + "time" "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/models" @@ -18,6 +20,10 @@ import ( // that only bucket data for that range is removed. func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, max int64, pred Predicate) error { span, ctx := tracing.StartSpanFromContext(rootCtx) + span.LogKV("name_prefix", fmt.Sprintf("%x", name), + "min", time.Unix(0, min), "max", time.Unix(0, max), + "has_pred", pred != nil, + ) defer span.Finish() // TODO(jeff): we need to block writes to this prefix while deletes are in progress // otherwise we can end up in a situation where we have staged data in the cache or @@ -219,6 +225,7 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma // Remove the measurement from the index before the series file. span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "TSI drop measurement") + span.LogKV("measurement_name", fmt.Sprintf("%x", name)) if err := e.index.DropMeasurement(name); err != nil { return err } @@ -227,7 +234,7 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma // Iterate over the series ids we previously extracted from the index // and remove from the series file. span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "SFile Delete Series IDs") - span.LogKV("series_id_set_size", set.Cardinality()) + span.LogKV("measurement_name", fmt.Sprintf("%x", name), "series_id_set_size", set.Cardinality()) set.ForEachNoLock(func(id tsdb.SeriesID) { if err = e.sfile.DeleteSeriesID(id); err != nil { return @@ -239,7 +246,7 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma // This is the slow path, when not dropping the entire bucket (measurement) span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "TSI/SFile Delete keys") - span.LogKV("keys_to_delete", len(possiblyDead.keys)) + span.LogKV("measurement_name", fmt.Sprintf("%x", name), "keys_to_delete", len(possiblyDead.keys)) for key := range possiblyDead.keys { // TODO(jeff): ugh reduce copies here keyb := []byte(key)