From 24446a0297b8ac66ebd01a6f4a6dfa9758145b13 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 25 May 2017 08:56:47 -0600 Subject: [PATCH] Implement zap logging in TSI. --- tsdb/index.go | 2 + tsdb/index/inmem/inmem.go | 3 ++ tsdb/index/tsi1/index.go | 84 +++++++++++++++++++++++++++++---------- tsdb/shard.go | 2 + 4 files changed, 71 insertions(+), 20 deletions(-) diff --git a/tsdb/index.go b/tsdb/index.go index c05617c7a0..3a7f3cb477 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -9,11 +9,13 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/estimator" + "github.com/uber-go/zap" ) type Index interface { Open() error Close() error + WithLogger(zap.Logger) MeasurementExists(name []byte) (bool, error) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index ebfd32e3fa..176807361a 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -26,6 +26,7 @@ import ( "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/pkg/estimator/hll" "github.com/influxdata/influxdb/tsdb" + "github.com/uber-go/zap" ) // IndexName is the name of this index. @@ -73,6 +74,8 @@ func (i *Index) Type() string { return IndexName } func (i *Index) Open() (err error) { return nil } func (i *Index) Close() error { return nil } +func (i *Index) WithLogger(zap.Logger) {} + // Series returns a series by key. func (i *Index) Series(key []byte) (*Series, error) { i.mu.RLock() diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 3cec6b5eff..acc69120bb 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -1,11 +1,11 @@ package tsi1 import ( + "crypto/rand" "encoding/json" "errors" "fmt" "io/ioutil" - "log" "os" "path/filepath" "regexp" @@ -19,6 +19,7 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/tsdb" + "github.com/uber-go/zap" ) // IndexName is the name of the index. @@ -87,6 +88,8 @@ type Index struct { // Frequency of compaction checks. CompactionEnabled bool CompactionMonitorInterval time.Duration + + logger zap.Logger } // NewIndex returns a new instance of Index. @@ -97,6 +100,8 @@ func NewIndex() *Index { // Default compaction thresholds. MaxLogFileSize: DefaultMaxLogFileSize, CompactionEnabled: true, + + logger: zap.New(zap.NullEncoder()), } } @@ -288,6 +293,11 @@ func (i *Index) writeManifestFile() error { return WriteManifestFile(i.ManifestPath(), i.Manifest()) } +// WithLogger sets the logger for the index. +func (i *Index) WithLogger(logger zap.Logger) { + i.logger = logger.With(zap.String("index", "tsi")) +} + // SetFieldSet sets a shared field set from the engine. func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) { i.mu.Lock() @@ -844,6 +854,9 @@ func (i *Index) compactToLevel(files []*IndexFile, level int) { assert(len(files) >= 2, "at least two index files are required for compaction") assert(level > 0, "cannot compact level zero") + // Build a logger for this compaction. + logger := i.logger.With(zap.String("token", generateCompactionToken())) + // Files have already been retained by caller. // Ensure files are released only once. var once sync.Once @@ -856,25 +869,27 @@ func (i *Index) compactToLevel(files []*IndexFile, level int) { path := filepath.Join(i.Path, FormatIndexFileName(i.NextSequence(), level)) f, err := os.Create(path) if err != nil { - log.Printf("%s: error creating compaction files: %s", IndexName, err) + logger.Error("cannot create compation files", zap.Error(err)) return } defer f.Close() - srcIDs := joinIntSlice(IndexFiles(files).IDs(), ",") - log.Printf("%s: performing full compaction: src=%s, path=%s", IndexName, srcIDs, filepath.Base(path)) + logger.Info("performing full compaction", + zap.String("src", joinIntSlice(IndexFiles(files).IDs(), ",")), + zap.String("dst", path), + ) // Compact all index files to new index file. lvl := i.levels[level] n, err := IndexFiles(files).CompactTo(f, lvl.M, lvl.K) if err != nil { - log.Printf("%s: error compacting index files: src=%s, path=%s, err=%s", IndexName, srcIDs, path, err) + logger.Error("cannot compact index files", zap.Error(err)) return } // Close file. if err := f.Close(); err != nil { - log.Printf("%s: error closing index file: %s", IndexName, err) + logger.Error("error closing index file", zap.Error(err)) return } @@ -882,7 +897,7 @@ func (i *Index) compactToLevel(files []*IndexFile, level int) { file := NewIndexFile() file.SetPath(path) if err := file.Open(); err != nil { - log.Printf("%s: error opening new index file: %s", IndexName, err) + logger.Error("cannot open new index file", zap.Error(err)) return } @@ -901,23 +916,30 @@ func (i *Index) compactToLevel(files []*IndexFile, level int) { } return nil }(); err != nil { - log.Printf("%s: error writing manifest: %s", IndexName, err) + logger.Error("cannot write manifest", zap.Error(err)) return } - log.Printf("%s: full compaction complete: file=%s, t=%s, sz=%d", IndexName, path, time.Since(start), n) + + elapsed := time.Since(start) + logger.Info("full compaction complete", + zap.String("path", path), + zap.String("elapsed", elapsed.String()), + zap.Int64("bytes", n), + zap.Int("kb_per_sec", int(float64(n)/elapsed.Seconds())/1024), + ) // Release old files. once.Do(func() { IndexFiles(files).Release() }) // Close and delete all old index files. for _, f := range files { - log.Printf("%s: removing index file: file=%s", IndexName, f.Path()) + logger.Info("removing index file", zap.String("path", f.Path())) if err := f.Close(); err != nil { - log.Printf("%s: error closing index file: %s", IndexName, err) + logger.Error("cannot close index file", zap.Error(err)) return } else if err := os.Remove(f.Path()); err != nil { - log.Printf("%s: error removing index file: %s", IndexName, err) + logger.Error("cannot remove index file", zap.Error(err)) return } } @@ -973,25 +995,33 @@ func (i *Index) compactLogFile(logFile *LogFile) { id := logFile.ID() assert(id != 0, "cannot parse log file id: %s", logFile.Path()) + // Build a logger for this compaction. + logger := i.logger.With( + zap.String("token", generateCompactionToken()), + zap.Int("id", id), + ) + // Create new index file. path := filepath.Join(i.Path, FormatIndexFileName(id, 1)) f, err := os.Create(path) if err != nil { - log.Printf("tsi1: error creating index file: %s", err) + logger.Error("cannot create index file", zap.Error(err)) return } defer f.Close() // Compact log file to new index file. lvl := i.levels[1] - if _, err := logFile.CompactTo(f, lvl.M, lvl.K); err != nil { + n, err := logFile.CompactTo(f, lvl.M, lvl.K) + if err != nil { log.Printf("%s: error compacting log file: path=%s, err=%s", IndexName, logFile.Path(), err) + logger.Error("cannot compact log file", zap.Error(err), zap.String("path", logFile.Path())) return } // Close file. if err := f.Close(); err != nil { - log.Printf("tsi1: error closing log file: %s", err) + logger.Error("cannot close log file", zap.Error(err)) return } @@ -999,7 +1029,7 @@ func (i *Index) compactLogFile(logFile *LogFile) { file := NewIndexFile() file.SetPath(path) if err := file.Open(); err != nil { - log.Printf("tsi1: error opening compacted index file: path=%s, err=%s", file.Path(), err) + logger.Error("cannot open compacted index file", zap.Error(err), zap.String("path", file.Path())) return } @@ -1018,17 +1048,23 @@ func (i *Index) compactLogFile(logFile *LogFile) { } return nil }(); err != nil { - log.Printf("%s: error updating manifest: %s", IndexName, err) + logger.Error("cannot update manifest", zap.Error(err)) return } - log.Printf("%s: log file compacted: file=%s, t=%0.03fs", IndexName, filepath.Base(logFile.Path()), time.Since(start).Seconds()) + + elapsed := time.Since(start) + logger.Error("log file compacted", + zap.String("elapsed", elapsed.String()), + zap.Int64("bytes", n), + zap.Int("kb_per_sec", int(float64(n)/elapsed.Seconds())/1024), + ) // Closing the log file will automatically wait until the ref count is zero. if err := logFile.Close(); err != nil { - log.Printf("%s: error closing log file: %s", IndexName, err) + logger.Error("cannot close log file", zap.Error(err)) return } else if err := os.Remove(logFile.Path()); err != nil { - log.Printf("%s: error removing log file: %s", IndexName, err) + logger.Error("cannot remove log file", zap.Error(err)) return } @@ -1289,3 +1325,11 @@ const MaxIndexMergeCount = 2 // MaxIndexFileSize is the maximum expected size of an index file. const MaxIndexFileSize = 4 * (1 << 30) + +// generateCompactionToken returns a short token to track an individual compaction. +// It is only used for logging so it doesn't need strong uniqueness guarantees. +func generateCompactionToken() string { + token := make([]byte, 3) + rand.Read(token) + return fmt.Sprintf("%x", token) +} diff --git a/tsdb/shard.go b/tsdb/shard.go index 01c069bcd0..9eeb94df28 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -170,6 +170,7 @@ func (s *Shard) WithLogger(log zap.Logger) { s.baseLogger = log if err := s.ready(); err == nil { s.engine.WithLogger(s.baseLogger) + s.index.WithLogger(s.baseLogger) } s.logger = s.baseLogger.With(zap.String("service", "shard")) } @@ -274,6 +275,7 @@ func (s *Shard) Open() error { return err } s.index = idx + idx.WithLogger(s.baseLogger) // Initialize underlying engine. e, err := NewEngine(s.id, idx, s.path, s.walPath, s.options)