refactor(storage): add more context to traces and logs
parent
ff7d13a10a
commit
e2f5b2bd9d
|
|
@ -6,13 +6,13 @@ import (
|
|||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/tsdb/tsm1"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/kit/tracing"
|
||||
"github.com/influxdata/influxdb/logger"
|
||||
"github.com/influxdata/influxdb/tsdb/tsm1"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -118,7 +118,8 @@ func (s *retentionEnforcer) run() {
|
|||
// Any series data that (1) belongs to a bucket in the provided list and
|
||||
// (2) falls outside the bucket's indicated retention period will be deleted.
|
||||
func (s *retentionEnforcer) expireData(ctx context.Context, buckets []*influxdb.Bucket, now time.Time) {
|
||||
logger, logEnd := logger.NewOperation(ctx, s.logger, "Data deletion", "data_deletion")
|
||||
logger, logEnd := logger.NewOperation(ctx, s.logger, "Data deletion", "data_deletion",
|
||||
zap.Int("buckets", len(buckets)))
|
||||
defer logEnd()
|
||||
|
||||
// Snapshot to clear the cache to reduce write contention.
|
||||
|
|
@ -126,31 +127,52 @@ func (s *retentionEnforcer) expireData(ctx context.Context, buckets []*influxdb.
|
|||
logger.Warn("Unable to snapshot cache before retention", zap.Error(err))
|
||||
}
|
||||
|
||||
var skipInf, skipInvalid int
|
||||
for _, b := range buckets {
|
||||
bucketFields := []zapcore.Field{
|
||||
zap.String("org_id", b.OrgID.String()),
|
||||
zap.String("bucket_id", b.ID.String()),
|
||||
zap.Duration("retention_period", b.RetentionPeriod),
|
||||
zap.Bool("system", b.IsSystem()),
|
||||
}
|
||||
|
||||
if b.RetentionPeriod == 0 {
|
||||
logger.Debug("Skipping bucket with infinite retention", bucketFields...)
|
||||
skipInf++
|
||||
continue
|
||||
} else if !b.OrgID.Valid() || !b.ID.Valid() {
|
||||
skipInvalid++
|
||||
logger.Warn("Skipping bucket with invalid fields", bucketFields...)
|
||||
continue
|
||||
}
|
||||
|
||||
min := int64(math.MinInt64)
|
||||
max := now.Add(-b.RetentionPeriod).UnixNano()
|
||||
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
span.LogKV(
|
||||
"bucket", b.Name,
|
||||
"bucket", b.ID,
|
||||
"org_id", b.OrgID,
|
||||
"system", b.IsSystem(),
|
||||
"retention_period", b.RetentionPeriod,
|
||||
"retention_policy", b.RetentionPolicyName)
|
||||
"retention_policy", b.RetentionPolicyName,
|
||||
"from", time.Unix(0, min).UTC(),
|
||||
"to", time.Unix(0, max).UTC(),
|
||||
)
|
||||
|
||||
max := now.Add(-b.RetentionPeriod).UnixNano()
|
||||
err := s.Engine.DeleteBucketRange(ctx, b.OrgID, b.ID, math.MinInt64, max)
|
||||
err := s.Engine.DeleteBucketRange(ctx, b.OrgID, b.ID, min, max)
|
||||
if err != nil {
|
||||
logger.Info("unable to delete bucket range",
|
||||
zap.String("bucket id", b.ID.String()),
|
||||
zap.String("org id", b.OrgID.String()),
|
||||
zap.Error(err))
|
||||
logger.Info("Unable to delete bucket range",
|
||||
append(bucketFields, zap.Time("min", time.Unix(0, min)), zap.Time("max", time.Unix(0, max)), zap.Error(err))...)
|
||||
tracing.LogError(span, err)
|
||||
}
|
||||
s.tracker.IncChecks(err == nil)
|
||||
|
||||
span.Finish()
|
||||
}
|
||||
|
||||
if skipInf > 0 || skipInvalid > 0 {
|
||||
logger.Info("Skipped buckets", zap.Int("infinite_retention_total", skipInf), zap.Int("invalid_total", skipInvalid))
|
||||
}
|
||||
}
|
||||
|
||||
// getBucketInformation returns a slice of buckets to run retention on.
|
||||
|
|
|
|||
|
|
@ -3,9 +3,11 @@ package tsm1
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/kit/tracing"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
|
|
@ -18,6 +20,10 @@ import (
|
|||
// that only bucket data for that range is removed.
|
||||
func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, max int64, pred Predicate) error {
|
||||
span, ctx := tracing.StartSpanFromContext(rootCtx)
|
||||
span.LogKV("name_prefix", fmt.Sprintf("%x", name),
|
||||
"min", time.Unix(0, min), "max", time.Unix(0, max),
|
||||
"has_pred", pred != nil,
|
||||
)
|
||||
defer span.Finish()
|
||||
// TODO(jeff): we need to block writes to this prefix while deletes are in progress
|
||||
// otherwise we can end up in a situation where we have staged data in the cache or
|
||||
|
|
@ -219,6 +225,7 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma
|
|||
|
||||
// Remove the measurement from the index before the series file.
|
||||
span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "TSI drop measurement")
|
||||
span.LogKV("measurement_name", fmt.Sprintf("%x", name))
|
||||
if err := e.index.DropMeasurement(name); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -227,7 +234,7 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma
|
|||
// Iterate over the series ids we previously extracted from the index
|
||||
// and remove from the series file.
|
||||
span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "SFile Delete Series IDs")
|
||||
span.LogKV("series_id_set_size", set.Cardinality())
|
||||
span.LogKV("measurement_name", fmt.Sprintf("%x", name), "series_id_set_size", set.Cardinality())
|
||||
set.ForEachNoLock(func(id tsdb.SeriesID) {
|
||||
if err = e.sfile.DeleteSeriesID(id); err != nil {
|
||||
return
|
||||
|
|
@ -239,7 +246,7 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma
|
|||
|
||||
// This is the slow path, when not dropping the entire bucket (measurement)
|
||||
span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "TSI/SFile Delete keys")
|
||||
span.LogKV("keys_to_delete", len(possiblyDead.keys))
|
||||
span.LogKV("measurement_name", fmt.Sprintf("%x", name), "keys_to_delete", len(possiblyDead.keys))
|
||||
for key := range possiblyDead.keys {
|
||||
// TODO(jeff): ugh reduce copies here
|
||||
keyb := []byte(key)
|
||||
|
|
|
|||
Loading…
Reference in New Issue