Generate trace logs for a number of significant influx operations

* tsdb Store.Open traces all events related to opening files
    * op.name : tsdb.open
* retention policy shard deletions
    * op.name : retention.delete_check
* all TSM compaction strategies
    * op.name : tsm1.compact_group
* series file compactions
    * op.name : series_partition.compaction
* continuous query execution (if logging enabled)
    * op.name : continuous_querier.execute
* TSI log file compaction
    * op_name: index.tsi.compact_log_file
* TSI level compaction
    * op.name: index.tsi.compact_to_level
pull/9456/head
Stuart Carnie 2018-02-21 13:08:44 -07:00
parent 290717bd04
commit d135aecf02
20 changed files with 453 additions and 116 deletions

View File

@ -115,7 +115,7 @@ func (cmd *Command) processDatabase(dbName, dataDir, walDir string) error {
}
func (cmd *Command) processRetentionPolicy(sfile *tsdb.SeriesFile, dbName, rpName, dataDir, walDir string) error {
cmd.Logger.Info("rebuilding retention policy", zap.String("db", dbName), zap.String("rp", rpName))
cmd.Logger.Info("rebuilding retention policy", logger.Database(dbName), logger.RetentionPolicy(rpName))
fis, err := ioutil.ReadDir(dataDir)
if err != nil {
@ -142,7 +142,7 @@ func (cmd *Command) processRetentionPolicy(sfile *tsdb.SeriesFile, dbName, rpNam
}
func (cmd *Command) processShard(sfile *tsdb.SeriesFile, dbName, rpName string, shardID uint64, dataDir, walDir string) error {
cmd.Logger.Info("rebuilding shard", zap.String("db", dbName), zap.String("rp", rpName), zap.Uint64("shard", shardID))
cmd.Logger.Info("rebuilding shard", logger.Database(dbName), logger.RetentionPolicy(rpName), logger.Shard(shardID))
// Check if shard already has a TSI index.
indexPath := filepath.Join(dataDir, "index")

111
logger/fields.go Normal file
View File

@ -0,0 +1,111 @@
package logger
import (
"time"
"github.com/influxdata/influxdb/pkg/snowflake"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
const (
// TraceIDKey is the logging context key used for identifying unique traces.
TraceIDKey = "trace_id"
// OperationNameKey is the logging context key used for identifying name of an operation.
OperationNameKey = "op.name"
// OperationEventKey is the logging context key used for identifying a notable
// event during the course of an operation.
OperationEventKey = "op.event"
// OperationElapsedKey is the logging context key used for identifying time elapsed to finish an operation.
OperationElapsedKey = "op.elapsed"
// DBInstanceKey is the logging context key used for identifying name of the relevant database.
DBInstanceKey = "db.instance"
// DBRetentionKey is the logging context key used for identifying name of the relevant retention policy.
DBRetentionKey = "db.rp"
// DBShardGroupKey is the logging context key used for identifying relevant shard group.
DBShardGroupKey = "db.shard_group"
// DBShardIDKey is the logging context key used for identifying name of the relevant shard group.
DBShardIDKey = "db.shard_id"
)
const (
eventStart = "start"
eventEnd = "end"
)
var (
gen = snowflake.New(0)
)
func nextID() string {
return gen.NextString()
}
// TraceID returns a field for tracking the trace identifier.
func TraceID(id string) zapcore.Field {
return zap.String(TraceIDKey, id)
}
// OperationName returns a field for tracking the name of an operation.
func OperationName(name string) zapcore.Field {
return zap.String(OperationNameKey, name)
}
// OperationElapsed returns a field for tracking the duration of an operation.
func OperationElapsed(d time.Duration) zapcore.Field {
return zap.Duration(OperationElapsedKey, d)
}
// OperationEventStart returns a field for tracking the start of an operation.
func OperationEventStart() zapcore.Field {
return zap.String(OperationEventKey, eventStart)
}
// OperationEventFinish returns a field for tracking the end of an operation.
func OperationEventEnd() zapcore.Field {
return zap.String(OperationEventKey, eventEnd)
}
// Database returns a field for tracking the name of a database.
func Database(name string) zapcore.Field {
return zap.String(DBInstanceKey, name)
}
// Database returns a field for tracking the name of a database.
func RetentionPolicy(name string) zapcore.Field {
return zap.String(DBRetentionKey, name)
}
// ShardGroup returns a field for tracking the shard group identifier.
func ShardGroup(id uint64) zapcore.Field {
return zap.Uint64(DBShardGroupKey, id)
}
// Shard returns a field for tracking the shard identifier.
func Shard(id uint64) zapcore.Field {
return zap.Uint64(DBShardIDKey, id)
}
// NewOperation uses the exiting log to create a new logger with context
// containing a trace id and the operation. Prior to returning, a standardized message
// is logged indicating the operation has started. The returned function should be
// called when the operation concludes in order to log a corresponding message which
// includes an elapsed time and that the operation has ended.
func NewOperation(log *zap.Logger, msg, name string, fields ...zapcore.Field) (*zap.Logger, func()) {
f := []zapcore.Field{TraceID(nextID()), OperationName(name)}
if len(fields) > 0 {
f = append(f, fields...)
}
now := time.Now()
log = log.With(f...)
log.Info(msg+" (start)", OperationEventStart())
return log, func() { log.Info(msg+" (end)", OperationEventEnd(), OperationElapsed(time.Since(now))) }
}

View File

@ -45,7 +45,7 @@ func (c *Config) New(defaultOutput io.Writer) (*zap.Logger, error) {
encoder,
zapcore.Lock(zapcore.AddSync(w)),
c.Level,
)), nil
), zap.Fields(zap.String("log_id", nextID()))), nil
}
func newEncoder(format string) (zapcore.Encoder, error) {

View File

@ -390,7 +390,7 @@ func (m *Monitor) createInternalStorage() {
}
if _, err := m.MetaClient.CreateDatabaseWithRetentionPolicy(m.storeDatabase, &spec); err != nil {
m.Logger.Info("Failed to create storage", zap.String("db", m.storeDatabase), zap.Error(err))
m.Logger.Info("Failed to create storage", logger.Database(m.storeDatabase), zap.Error(err))
return
}
}
@ -417,7 +417,7 @@ func (m *Monitor) waitUntilInterval(d time.Duration) error {
// storeStatistics writes the statistics to an InfluxDB system.
func (m *Monitor) storeStatistics() {
defer m.wg.Done()
m.Logger.Info("Storing statistics", zap.String("db", m.storeDatabase), zap.String("rp", m.storeRetentionPolicy), logger.DurationLiteral("interval", m.storeInterval))
m.Logger.Info("Storing statistics", logger.Database(m.storeDatabase), logger.RetentionPolicy(m.storeRetentionPolicy), logger.DurationLiteral("interval", m.storeInterval))
// Wait until an even interval to start recording monitor statistics.
// If we are interrupted before the interval for some reason, exit early.

38
pkg/snowflake/README.md Normal file
View File

@ -0,0 +1,38 @@
Snowflake ID generator
======================
This is a Go implementation of [Twitter Snowflake](https://blog.twitter.com/2010/announcing-snowflake).
The most useful aspect of these IDs is they are _roughly_ sortable and when generated
at roughly the same time, should have values in close proximity to each other.
IDs
---
Each id will be a 64-bit number represented, structured as follows:
```
6 6 5 4 3 2 1
3210987654321098765432109876543210987654321098765432109876543210
ttttttttttttttttttttttttttttttttttttttttttmmmmmmmmmmssssssssssss
```
where
* s (sequence) is a 12-bit integer that increments if called multiple times for the same millisecond
* m (machine id) is a 10-bit integer representing the server id
* t (time) is a 42-bit integer representing the current timestamp in milliseconds
the number of milliseconds to have elapsed since 1491696000000 or 2017-04-09T00:00:00Z
### String Encoding
The 64-bit unsigned integer is base-63 encoded using the following URL-safe characters, which are ordered
according to their ASCII value.
```
0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz~
```
A binary sort of a list of encoded values will be correctly ordered according to the numerical representation.

107
pkg/snowflake/gen.go Normal file
View File

@ -0,0 +1,107 @@
package snowflake
import (
"fmt"
"sync"
"time"
)
const (
epoch = 1491696000000
serverBits = 10
sequenceBits = 12
serverShift = sequenceBits
timeShift = sequenceBits + serverBits
serverMax = ^(-1 << serverBits)
sequenceMask = ^(-1 << sequenceBits)
)
type Generator struct {
rw sync.Mutex
lastTimestamp uint64
machineID int
sequence int32
}
func New(machineID int) *Generator {
if machineID < 0 || machineID > serverMax {
panic(fmt.Errorf("invalid machine id; must be 0 ≤ id < %d", serverMax))
}
return &Generator{
machineID: machineID,
lastTimestamp: 0,
sequence: 0,
}
}
func (g *Generator) MachineID() int {
return g.machineID
}
func (g *Generator) Next() uint64 {
t := now()
g.rw.Lock()
if t == g.lastTimestamp {
g.sequence = (g.sequence + 1) & sequenceMask
if g.sequence == 0 {
t = g.nextMillis()
}
} else if t < g.lastTimestamp {
t = g.nextMillis()
} else {
g.sequence = 0
}
g.lastTimestamp = t
seq := g.sequence
g.rw.Unlock()
tp := (t - epoch) << timeShift
sp := uint64(g.machineID << serverShift)
n := tp | sp | uint64(seq)
return n
}
func (g *Generator) NextString() string {
var s [11]byte
encode(&s, g.Next())
return string(s[:])
}
func (g *Generator) AppendNext(s *[11]byte) {
encode(s, g.Next())
}
func (g *Generator) nextMillis() uint64 {
t := now()
for t <= g.lastTimestamp {
time.Sleep(100 * time.Microsecond)
t = now()
}
return t
}
func now() uint64 { return uint64(time.Now().UnixNano() / 1e6) }
var digits = [...]byte{
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J',
'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
'U', 'V', 'W', 'X', 'Y', 'Z', '_', 'a', 'b', 'c',
'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w',
'x', 'y', 'z', '~'}
func encode(s *[11]byte, n uint64) {
s[10], n = digits[n&0x3f], n>>6
s[9], n = digits[n&0x3f], n>>6
s[8], n = digits[n&0x3f], n>>6
s[7], n = digits[n&0x3f], n>>6
s[6], n = digits[n&0x3f], n>>6
s[5], n = digits[n&0x3f], n>>6
s[4], n = digits[n&0x3f], n>>6
s[3], n = digits[n&0x3f], n>>6
s[2], n = digits[n&0x3f], n>>6
s[1], n = digits[n&0x3f], n>>6
s[0] = digits[n&0x3f]
}

68
pkg/snowflake/gen_test.go Normal file
View File

@ -0,0 +1,68 @@
package snowflake
import (
"fmt"
"math/rand"
"sort"
"testing"
"github.com/influxdata/influxdb/pkg/testing/assert"
)
func TestEncode(t *testing.T) {
tests := []struct {
v uint64
exp string
}{
{0x000, "00000000000"},
{0x001, "00000000001"},
{0x03f, "0000000000~"},
{0x07f, "0000000001~"},
{0xf07f07f07f07f07f, "F1~1~1~1~1~"},
}
for _, test := range tests {
t.Run(fmt.Sprintf("0x%03x→%s", test.v, test.exp), func(t *testing.T) {
var s [11]byte
encode(&s, test.v)
assert.Equal(t, string(s[:]), test.exp)
})
}
}
// TestSorting verifies numbers using base 63 encoding are ordered according to their numerical representation.
func TestSorting(t *testing.T) {
var (
vals = make([]string, 1000)
exp = make([]string, 1000)
)
for i := 0; i < len(vals); i++ {
var s [11]byte
encode(&s, uint64(i*47))
vals[i] = string(s[:])
exp[i] = string(s[:])
}
// randomize them
shuffle(len(vals), func(i, j int) {
vals[i], vals[j] = vals[j], vals[i]
})
sort.Strings(vals)
assert.Equal(t, vals, exp)
}
func BenchmarkEncode(b *testing.B) {
b.ReportAllocs()
var s [11]byte
for i := 0; i < b.N; i++ {
encode(&s, 100)
}
}
func shuffle(n int, swap func(i, j int)) {
for i := n - 1; i > 0; i-- {
j := rand.Intn(i + 1)
swap(i, j)
}
}

View File

@ -15,6 +15,7 @@ import (
"collectd.org/api"
"collectd.org/network"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
@ -395,7 +396,7 @@ func (s *Service) writePoints() {
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.Logger.Info("Required database not yet created",
zap.String("db", s.Config.Database), zap.Error(err))
logger.Database(s.Config.Database), zap.Error(err))
continue
}
@ -404,7 +405,7 @@ func (s *Service) writePoints() {
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {
s.Logger.Info("Failed to write point batch to database",
zap.String("db", s.Config.Database), zap.Error(err))
logger.Database(s.Config.Database), zap.Error(err))
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
}
}

View File

@ -360,13 +360,20 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
return false, fmt.Errorf("unable to set time range: %s", err)
}
var start time.Time
var (
start time.Time
log = s.Logger
)
if s.loggingEnabled || s.queryStatsEnabled {
start = time.Now()
}
if s.loggingEnabled {
s.Logger.Info("Executing continuous query",
var logEnd func()
log, logEnd = logger.NewOperation(s.Logger, "Continuous query execution", "continuous_querier.execute")
defer logEnd()
log.Info("Executing continuous query",
zap.String("name", cq.Info.Name),
zap.Time("start", startTime),
zap.Time("end", endTime))
@ -391,7 +398,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
}
if s.loggingEnabled {
s.Logger.Info("Finished continuous query",
log.Info("Finished continuous query",
zap.String("name", cq.Info.Name),
zap.Int64("written", written),
zap.Time("start", startTime),

View File

@ -463,7 +463,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {
s.logger.Info("Failed to write point batch to database",
zap.String("db", s.database), zap.Error(err))
logger.Database(s.database), zap.Error(err))
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
}

View File

@ -24,6 +24,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/monitor/diagnostics"
@ -421,7 +422,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U
h.Logger.Info("Unauthorized request",
zap.String("user", err.User),
zap.Stringer("query", err.Query),
zap.String("db", err.Database))
logger.Database(err.Database))
}
h.httpError(rw, "error authorizing query: "+err.Error(), http.StatusForbidden)
return
@ -981,7 +982,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
h.Logger.Info("Unauthorized request",
zap.String("user", err.User),
zap.Stringer("query", err.Query),
zap.String("db", err.Database))
logger.Database(err.Database))
}
h.httpError(w, "error authorizing query: "+err.Error(), http.StatusForbidden)
return

View File

@ -18,6 +18,7 @@ import (
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxql"
"go.uber.org/zap"
@ -800,9 +801,9 @@ func (c *Client) PrecreateShardGroups(from, to time.Time) error {
// if it already exists, continue
if sg, _ := data.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); sg != nil {
c.logger.Info("Shard group already exists",
zap.Uint64("id", sg.ID),
zap.String("db", di.Name),
zap.String("rp", rp.Name))
logger.ShardGroup(sg.ID),
logger.Database(di.Name),
logger.RetentionPolicy(rp.Name))
continue
}
newGroup, err := createShardGroup(data, di.Name, rp.Name, nextShardGroupTime)
@ -813,9 +814,9 @@ func (c *Client) PrecreateShardGroups(from, to time.Time) error {
}
changed = true
c.logger.Info("New shard group successfully precreated",
zap.Uint64("group_id", newGroup.ID),
zap.String("db", di.Name),
zap.String("rp", rp.Name))
logger.ShardGroup(newGroup.ID),
logger.Database(di.Name),
logger.RetentionPolicy(rp.Name))
}
}
}

View File

@ -15,6 +15,7 @@ import (
"sync/atomic"
"time"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
@ -464,7 +465,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
case batch := <-batcher.Out():
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.Logger.Info("Required database does not yet exist", zap.String("db", s.Database), zap.Error(err))
s.Logger.Info("Required database does not yet exist", logger.Database(s.Database), zap.Error(err))
continue
}
@ -473,7 +474,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {
s.Logger.Info("Failed to write point batch to database",
zap.String("db", s.Database), zap.Error(err))
logger.Database(s.Database), zap.Error(err))
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
}
}

View File

@ -80,7 +80,7 @@ func (s *Service) run() {
return
case <-ticker.C:
s.logger.Info("Shard deletion check commencing")
log, logEnd := logger.NewOperation(s.logger, "Retention policy deletion check", "retention.delete_check")
type deletionInfo struct {
db string
@ -98,19 +98,19 @@ func (s *Service) run() {
for _, r := range d.RetentionPolicies {
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
s.logger.Info("Failed to delete shard group",
zap.Uint64("id", g.ID),
zap.String("db", d.Name),
zap.String("rp", r.Name),
log.Info("Failed to delete shard group",
logger.Database(d.Name),
logger.ShardGroup(g.ID),
logger.RetentionPolicy(r.Name),
zap.Error(err))
retryNeeded = true
continue
}
s.logger.Info("Deleted shard group",
zap.Uint64("id", g.ID),
zap.String("db", d.Name),
zap.String("rp", r.Name))
log.Info("Deleted shard group",
logger.Database(d.Name),
logger.ShardGroup(g.ID),
logger.RetentionPolicy(r.Name))
// Store all the shard IDs that may possibly need to be removed locally.
for _, sh := range g.Shards {
@ -124,29 +124,31 @@ func (s *Service) run() {
for _, id := range s.TSDBStore.ShardIDs() {
if info, ok := deletedShardIDs[id]; ok {
if err := s.TSDBStore.DeleteShard(id); err != nil {
s.logger.Info("Failed to delete shard",
zap.Uint64("id", id),
zap.String("db", info.db),
zap.String("rp", info.rp),
log.Info("Failed to delete shard",
logger.Database(info.db),
logger.Shard(id),
logger.RetentionPolicy(info.rp),
zap.Error(err))
retryNeeded = true
continue
}
s.logger.Info("Deleted shard",
zap.Uint64("id", id),
zap.String("db", info.db),
zap.String("rp", info.rp))
log.Info("Deleted shard",
logger.Database(info.db),
logger.Shard(id),
logger.RetentionPolicy(info.rp))
}
}
if err := s.MetaClient.PruneShardGroups(); err != nil {
s.logger.Info("Problem pruning shard groups", zap.Error(err))
log.Info("Problem pruning shard groups", zap.Error(err))
retryNeeded = true
}
if retryNeeded {
s.logger.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval)))
log.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval)))
}
logEnd()
}
}
}

View File

@ -11,6 +11,7 @@ import (
"time"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/services/meta"
@ -315,8 +316,8 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) {
}
s.subs[se] = cw
s.Logger.Info("Added new subscription",
zap.String("db", se.db),
zap.String("rp", se.rp))
logger.Database(se.db),
logger.RetentionPolicy(se.rp))
}
}
}
@ -330,8 +331,8 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) {
// Remove it from the set
delete(s.subs, se)
s.Logger.Info("Deleted old subscription",
zap.String("db", se.db),
zap.String("rp", se.rp))
logger.Database(se.db),
logger.RetentionPolicy(se.rp))
}
}
}

View File

@ -8,6 +8,7 @@ import (
"sync/atomic"
"time"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
@ -161,7 +162,7 @@ func (s *Service) writer() {
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.Logger.Info("Required database does not yet exist",
zap.String("db", s.config.Database), zap.Error(err))
logger.Database(s.config.Database), zap.Error(err))
continue
}
@ -170,7 +171,7 @@ func (s *Service) writer() {
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {
s.Logger.Info("Failed to write point batch to database",
zap.String("db", s.config.Database), zap.Error(err))
logger.Database(s.config.Database), zap.Error(err))
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
}

View File

@ -20,6 +20,7 @@ import (
"sync/atomic"
"time"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/bytesutil"
"github.com/influxdata/influxdb/pkg/estimator"
@ -1558,24 +1559,22 @@ func (e *Engine) WriteSnapshot() error {
// Lock and grab the cache snapshot along with all the closed WAL
// filenames associated with the snapshot
var started *time.Time
started := time.Now()
log, logEnd := logger.NewOperation(e.logger, "Cache snapshot", "cache.snapshot")
defer func() {
if started != nil {
e.Cache.UpdateCompactTime(time.Since(*started))
e.logger.Info("Snapshot for path written",
zap.String("path", e.path),
zap.Duration("duration", time.Since(*started)))
}
elapsed := time.Since(started)
e.Cache.UpdateCompactTime(elapsed)
e.logger.Info("Snapshot for path written",
zap.String("path", e.path),
zap.Duration("duration", elapsed))
logEnd()
}()
closedFiles, snapshot, err := func() ([]string, *Cache, error) {
e.mu.Lock()
defer e.mu.Unlock()
now := time.Now()
started = &now
if err := e.WAL.CloseSegment(); err != nil {
return nil, nil, err
}
@ -1611,7 +1610,7 @@ func (e *Engine) WriteSnapshot() error {
zap.String("path", e.path),
zap.Duration("duration", time.Since(dedup)))
return e.writeSnapshotAndCommit(closedFiles, snapshot)
return e.writeSnapshotAndCommit(log, closedFiles, snapshot)
}
// CreateSnapshot will create a temp directory that holds
@ -1633,7 +1632,7 @@ func (e *Engine) CreateSnapshot() (string, error) {
}
// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments.
func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (err error) {
func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, snapshot *Cache) (err error) {
defer func() {
if err != nil {
e.Cache.ClearSnapshot(false)
@ -1643,7 +1642,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (
// write the new snapshot files
newFiles, err := e.Compactor.WriteSnapshot(snapshot)
if err != nil {
e.logger.Info("Error writing snapshot from compactor", zap.Error(err))
log.Info("Error writing snapshot from compactor", zap.Error(err))
return err
}
@ -1652,7 +1651,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (
// update the file store with these new files
if err := e.FileStore.Replace(nil, newFiles); err != nil {
e.logger.Info("Error adding new TSM files from snapshot", zap.Error(err))
log.Info("Error adding new TSM files from snapshot", zap.Error(err))
return err
}
@ -1660,7 +1659,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (
e.Cache.ClearSnapshot(true)
if err := e.WAL.Remove(closedFiles); err != nil {
e.logger.Info("Error removing closed WAL segments", zap.Error(err))
log.Info("Error removing closed WAL segments", zap.Error(err))
}
return nil
@ -1913,9 +1912,12 @@ func (s *compactionStrategy) Apply() {
func (s *compactionStrategy) compactGroup() {
group := s.group
start := time.Now()
s.logger.Info("Beginning compaction", zap.Int("files", len(group)))
log, logEnd := logger.NewOperation(s.logger, "TSM compaction", "tsm1.compact_group")
defer logEnd()
log.Info("Beginning compaction", zap.Int("files", len(group)))
for i, f := range group {
s.logger.Info("Compacting file", zap.Int("index", i), zap.String("file", f))
log.Info("Compacting file", zap.Int("index", i), zap.String("file", f))
}
var (
@ -1932,7 +1934,7 @@ func (s *compactionStrategy) compactGroup() {
if err != nil {
_, inProgress := err.(errCompactionInProgress)
if err == errCompactionsDisabled || inProgress {
s.logger.Info("Aborted compaction", zap.Error(err))
log.Info("Aborted compaction", zap.Error(err))
if _, ok := err.(errCompactionInProgress); ok {
time.Sleep(time.Second)
@ -1940,23 +1942,23 @@ func (s *compactionStrategy) compactGroup() {
return
}
s.logger.Info("Error compacting TSM files", zap.Error(err))
log.Info("Error compacting TSM files", zap.Error(err))
atomic.AddInt64(s.errorStat, 1)
time.Sleep(time.Second)
return
}
if err := s.fileStore.ReplaceWithCallback(group, files, nil); err != nil {
s.logger.Info("Error replacing new TSM files", zap.Error(err))
log.Info("Error replacing new TSM files", zap.Error(err))
atomic.AddInt64(s.errorStat, 1)
time.Sleep(time.Second)
return
}
for i, f := range files {
s.logger.Info("Compacted file", zap.Int("index", i), zap.String("file", f))
log.Info("Compacted file", zap.Int("index", i), zap.String("file", f))
}
s.logger.Info("Finished compacting files",
log.Info("Finished compacting files",
zap.Int("groups", len(group)),
zap.Int("files", len(files)),
zap.Duration("duration", time.Since(start)))

View File

@ -1,7 +1,6 @@
package tsi1
import (
"crypto/rand"
"encoding/json"
"errors"
"fmt"
@ -14,6 +13,7 @@ import (
"sync"
"time"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/bytesutil"
"github.com/influxdata/influxdb/pkg/estimator"
@ -887,12 +887,13 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
assert(level > 0, "cannot compact level zero")
// Build a logger for this compaction.
logger := i.logger.With(zap.String("token", generateCompactionToken()))
log, logEnd := logger.NewOperation(i.logger, "TSI level compaction", "index.tsi.compact_to_level", zap.Int("tsi_level", level))
defer logEnd()
// Check for cancellation.
select {
case <-interrupt:
logger.Error("cannot begin compaction", zap.Error(ErrCompactionInterrupted))
log.Error("Cannot begin compaction", zap.Error(ErrCompactionInterrupted))
return
default:
}
@ -909,12 +910,12 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
path := filepath.Join(i.path, FormatIndexFileName(i.NextSequence(), level))
f, err := os.Create(path)
if err != nil {
logger.Error("cannot create compaction files", zap.Error(err))
log.Error("Cannot create compaction files", zap.Error(err))
return
}
defer f.Close()
logger.Info("performing full compaction",
log.Info("Performing full compaction",
zap.String("src", joinIntSlice(IndexFiles(files).IDs(), ",")),
zap.String("dst", path),
)
@ -923,13 +924,13 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
lvl := i.levels[level]
n, err := IndexFiles(files).CompactTo(f, i.sfile, lvl.M, lvl.K, interrupt)
if err != nil {
logger.Error("cannot compact index files", zap.Error(err))
log.Error("Cannot compact index files", zap.Error(err))
return
}
// Close file.
if err := f.Close(); err != nil {
logger.Error("error closing index file", zap.Error(err))
log.Error("Error closing index file", zap.Error(err))
return
}
@ -937,7 +938,7 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
file := NewIndexFile(i.sfile)
file.SetPath(path)
if err := file.Open(); err != nil {
logger.Error("cannot open new index file", zap.Error(err))
log.Error("Cannot open new index file", zap.Error(err))
return
}
@ -958,14 +959,14 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
i.manifestSize = manifestSize
return nil
}(); err != nil {
logger.Error("cannot write manifest", zap.Error(err))
log.Error("Cannot write manifest", zap.Error(err))
return
}
elapsed := time.Since(start)
logger.Info("full compaction complete",
log.Info("Full compaction complete",
zap.String("path", path),
zap.String("elapsed", elapsed.String()),
logger.DurationLiteral("elapsed", elapsed),
zap.Int64("bytes", n),
zap.Int("kb_per_sec", int(float64(n)/elapsed.Seconds())/1024),
)
@ -975,13 +976,13 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
// Close and delete all old index files.
for _, f := range files {
logger.Info("removing index file", zap.String("path", f.Path()))
log.Info("Removing index file", zap.String("path", f.Path()))
if err := f.Close(); err != nil {
logger.Error("cannot close index file", zap.Error(err))
log.Error("Cannot close index file", zap.Error(err))
return
} else if err := os.Remove(f.Path()); err != nil {
logger.Error("cannot remove index file", zap.Error(err))
log.Error("Cannot remove index file", zap.Error(err))
return
}
}
@ -1048,16 +1049,14 @@ func (i *Partition) compactLogFile(logFile *LogFile) {
assert(id != 0, "cannot parse log file id: %s", logFile.Path())
// Build a logger for this compaction.
logger := i.logger.With(
zap.String("token", generateCompactionToken()),
zap.Int("id", id),
)
log, logEnd := logger.NewOperation(i.logger, "TSI log compaction", "index.tsi.compact_log_file", zap.Int("log_file_id", id))
defer logEnd()
// Create new index file.
path := filepath.Join(i.path, FormatIndexFileName(id, 1))
f, err := os.Create(path)
if err != nil {
logger.Error("cannot create index file", zap.Error(err))
log.Error("Cannot create index file", zap.Error(err))
return
}
defer f.Close()
@ -1066,13 +1065,13 @@ func (i *Partition) compactLogFile(logFile *LogFile) {
lvl := i.levels[1]
n, err := logFile.CompactTo(f, lvl.M, lvl.K, interrupt)
if err != nil {
logger.Error("cannot compact log file", zap.Error(err), zap.String("path", logFile.Path()))
log.Error("Cannot compact log file", zap.Error(err), zap.String("path", logFile.Path()))
return
}
// Close file.
if err := f.Close(); err != nil {
logger.Error("cannot close log file", zap.Error(err))
log.Error("Cannot close log file", zap.Error(err))
return
}
@ -1080,7 +1079,7 @@ func (i *Partition) compactLogFile(logFile *LogFile) {
file := NewIndexFile(i.sfile)
file.SetPath(path)
if err := file.Open(); err != nil {
logger.Error("cannot open compacted index file", zap.Error(err), zap.String("path", file.Path()))
log.Error("Cannot open compacted index file", zap.Error(err), zap.String("path", file.Path()))
return
}
@ -1102,23 +1101,23 @@ func (i *Partition) compactLogFile(logFile *LogFile) {
i.manifestSize = manifestSize
return nil
}(); err != nil {
logger.Error("cannot update manifest", zap.Error(err))
log.Error("Cannot update manifest", zap.Error(err))
return
}
elapsed := time.Since(start)
logger.Info("log file compacted",
zap.String("elapsed", elapsed.String()),
log.Info("Log file compacted",
logger.DurationLiteral("elapsed", elapsed),
zap.Int64("bytes", n),
zap.Int("kb_per_sec", int(float64(n)/elapsed.Seconds())/1024),
)
// Closing the log file will automatically wait until the ref count is zero.
if err := logFile.Close(); err != nil {
logger.Error("cannot close log file", zap.Error(err))
log.Error("Cannot close log file", zap.Error(err))
return
} else if err := os.Remove(logFile.Path()); err != nil {
logger.Error("cannot remove log file", zap.Error(err))
log.Error("Cannot remove log file", zap.Error(err))
return
}
}
@ -1277,11 +1276,3 @@ const MaxIndexMergeCount = 2
// MaxIndexFileSize is the maximum expected size of an index file.
const MaxIndexFileSize = 4 * (1 << 30)
// generateCompactionToken returns a short token to track an individual compaction.
// It is only used for logging so it doesn't need strong uniqueness guarantees.
func generateCompactionToken() string {
token := make([]byte, 3)
rand.Read(token)
return fmt.Sprintf("%x", token)
}

View File

@ -8,8 +8,8 @@ import (
"os"
"path/filepath"
"sync"
"time"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/rhh"
"go.uber.org/zap"
@ -258,10 +258,8 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio
// Check if we've crossed the compaction threshold.
if p.compactionsEnabled() && !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) {
p.compacting = true
logger := p.Logger.With(zap.String("path", p.path))
logger.Info("beginning series partition compaction")
log, logEnd := logger.NewOperation(p.Logger, "Series partition compaction", "series_partition.compaction", zap.String("path", p.path))
startTime := time.Now()
p.wg.Add(1)
go func() {
defer p.wg.Done()
@ -269,10 +267,10 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio
compactor := NewSeriesPartitionCompactor()
compactor.cancel = p.closing
if err := compactor.Compact(p); err != nil {
logger.With(zap.Error(err)).Error("series partition compaction failed")
log.Error("series partition compaction failed", zap.Error(err))
}
logger.With(zap.Duration("elapsed", time.Since(startTime))).Info("completed series partition compaction")
logEnd()
// Clear compaction flag.
p.mu.Lock()

View File

@ -15,6 +15,7 @@ import (
"sync"
"time"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/pkg/estimator/hll"
"github.com/influxdata/influxdb/models"
@ -203,6 +204,9 @@ func (s *Store) loadShards() error {
s.Logger.Info("Compaction throughput limit disabled")
}
log, logEnd := logger.NewOperation(s.Logger, "Open store", "tsdb.open")
defer logEnd()
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
resC := make(chan *res)
var n int
@ -214,8 +218,9 @@ func (s *Store) loadShards() error {
}
for _, db := range dbDirs {
dbPath := filepath.Join(s.path, db.Name())
if !db.IsDir() {
s.Logger.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
continue
}
@ -232,14 +237,15 @@ func (s *Store) loadShards() error {
}
// Load each retention policy within the database directory.
rpDirs, err := ioutil.ReadDir(filepath.Join(s.path, db.Name()))
rpDirs, err := ioutil.ReadDir(dbPath)
if err != nil {
return err
}
for _, rp := range rpDirs {
rpPath := filepath.Join(s.path, db.Name(), rp.Name())
if !rp.IsDir() {
s.Logger.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory"))
log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory"))
continue
}
@ -248,7 +254,7 @@ func (s *Store) loadShards() error {
continue
}
shardDirs, err := ioutil.ReadDir(filepath.Join(s.path, db.Name(), rp.Name()))
shardDirs, err := ioutil.ReadDir(rpPath)
if err != nil {
return err
}
@ -266,6 +272,7 @@ func (s *Store) loadShards() error {
// Shard file names are numeric shardIDs
shardID, err := strconv.ParseUint(sh, 10, 64)
if err != nil {
log.Info("invalid shard ID found at path", zap.String("path", path))
resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)}
return
}
@ -291,12 +298,13 @@ func (s *Store) loadShards() error {
err = shard.Open()
if err != nil {
log.Info("Failed to open shard", logger.Shard(shardID), zap.Error(err))
resC <- &res{err: fmt.Errorf("Failed to open shard: %d: %s", shardID, err)}
return
}
resC <- &res{s: shard}
s.Logger.Info("Opened shard", zap.String("path", path), zap.Duration("duration", time.Since(start)))
log.Info("Opened shard", zap.String("path", path), zap.Duration("duration", time.Since(start)))
}(db.Name(), rp.Name(), sh.Name())
}
}
@ -307,7 +315,6 @@ func (s *Store) loadShards() error {
for i := 0; i < n; i++ {
res := <-resC
if res.err != nil {
s.Logger.Info(res.err.Error())
continue
}
s.shards[res.s.id] = res.s
@ -1764,7 +1771,7 @@ func (s *Store) monitorShards() {
zap.String("perc", fmt.Sprintf("%d%%", perc)),
zap.Int("n", n),
zap.Int("max", s.EngineOptions.Config.MaxValuesPerTag),
zap.String("db", db),
logger.Database(db),
zap.ByteString("measurement", name),
zap.ByteString("tag", k))
}