Merge pull request #8427 from benbjohnson/tsi-zap

Implement zap logging in TSI.
pull/8381/merge
Ben Johnson 2017-05-25 12:00:30 -06:00 committed by GitHub
commit c767b65766
4 changed files with 71 additions and 20 deletions

View File

@ -9,11 +9,13 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/uber-go/zap"
)
type Index interface {
Open() error
Close() error
WithLogger(zap.Logger)
MeasurementExists(name []byte) (bool, error)
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)

View File

@ -26,6 +26,7 @@ import (
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/estimator/hll"
"github.com/influxdata/influxdb/tsdb"
"github.com/uber-go/zap"
)
// IndexName is the name of this index.
@ -73,6 +74,8 @@ func (i *Index) Type() string { return IndexName }
func (i *Index) Open() (err error) { return nil }
func (i *Index) Close() error { return nil }
func (i *Index) WithLogger(zap.Logger) {}
// Series returns a series by key.
func (i *Index) Series(key []byte) (*Series, error) {
i.mu.RLock()

View File

@ -1,11 +1,11 @@
package tsi1
import (
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"regexp"
@ -19,6 +19,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/tsdb"
"github.com/uber-go/zap"
)
// IndexName is the name of the index.
@ -87,6 +88,8 @@ type Index struct {
// Frequency of compaction checks.
CompactionEnabled bool
CompactionMonitorInterval time.Duration
logger zap.Logger
}
// NewIndex returns a new instance of Index.
@ -97,6 +100,8 @@ func NewIndex() *Index {
// Default compaction thresholds.
MaxLogFileSize: DefaultMaxLogFileSize,
CompactionEnabled: true,
logger: zap.New(zap.NullEncoder()),
}
}
@ -288,6 +293,11 @@ func (i *Index) writeManifestFile() error {
return WriteManifestFile(i.ManifestPath(), i.Manifest())
}
// WithLogger sets the logger for the index.
func (i *Index) WithLogger(logger zap.Logger) {
i.logger = logger.With(zap.String("index", "tsi"))
}
// SetFieldSet sets a shared field set from the engine.
func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
i.mu.Lock()
@ -844,6 +854,9 @@ func (i *Index) compactToLevel(files []*IndexFile, level int) {
assert(len(files) >= 2, "at least two index files are required for compaction")
assert(level > 0, "cannot compact level zero")
// Build a logger for this compaction.
logger := i.logger.With(zap.String("token", generateCompactionToken()))
// Files have already been retained by caller.
// Ensure files are released only once.
var once sync.Once
@ -856,25 +869,27 @@ func (i *Index) compactToLevel(files []*IndexFile, level int) {
path := filepath.Join(i.Path, FormatIndexFileName(i.NextSequence(), level))
f, err := os.Create(path)
if err != nil {
log.Printf("%s: error creating compaction files: %s", IndexName, err)
logger.Error("cannot create compation files", zap.Error(err))
return
}
defer f.Close()
srcIDs := joinIntSlice(IndexFiles(files).IDs(), ",")
log.Printf("%s: performing full compaction: src=%s, path=%s", IndexName, srcIDs, filepath.Base(path))
logger.Info("performing full compaction",
zap.String("src", joinIntSlice(IndexFiles(files).IDs(), ",")),
zap.String("dst", path),
)
// Compact all index files to new index file.
lvl := i.levels[level]
n, err := IndexFiles(files).CompactTo(f, lvl.M, lvl.K)
if err != nil {
log.Printf("%s: error compacting index files: src=%s, path=%s, err=%s", IndexName, srcIDs, path, err)
logger.Error("cannot compact index files", zap.Error(err))
return
}
// Close file.
if err := f.Close(); err != nil {
log.Printf("%s: error closing index file: %s", IndexName, err)
logger.Error("error closing index file", zap.Error(err))
return
}
@ -882,7 +897,7 @@ func (i *Index) compactToLevel(files []*IndexFile, level int) {
file := NewIndexFile()
file.SetPath(path)
if err := file.Open(); err != nil {
log.Printf("%s: error opening new index file: %s", IndexName, err)
logger.Error("cannot open new index file", zap.Error(err))
return
}
@ -901,23 +916,30 @@ func (i *Index) compactToLevel(files []*IndexFile, level int) {
}
return nil
}(); err != nil {
log.Printf("%s: error writing manifest: %s", IndexName, err)
logger.Error("cannot write manifest", zap.Error(err))
return
}
log.Printf("%s: full compaction complete: file=%s, t=%s, sz=%d", IndexName, path, time.Since(start), n)
elapsed := time.Since(start)
logger.Info("full compaction complete",
zap.String("path", path),
zap.String("elapsed", elapsed.String()),
zap.Int64("bytes", n),
zap.Int("kb_per_sec", int(float64(n)/elapsed.Seconds())/1024),
)
// Release old files.
once.Do(func() { IndexFiles(files).Release() })
// Close and delete all old index files.
for _, f := range files {
log.Printf("%s: removing index file: file=%s", IndexName, f.Path())
logger.Info("removing index file", zap.String("path", f.Path()))
if err := f.Close(); err != nil {
log.Printf("%s: error closing index file: %s", IndexName, err)
logger.Error("cannot close index file", zap.Error(err))
return
} else if err := os.Remove(f.Path()); err != nil {
log.Printf("%s: error removing index file: %s", IndexName, err)
logger.Error("cannot remove index file", zap.Error(err))
return
}
}
@ -973,25 +995,33 @@ func (i *Index) compactLogFile(logFile *LogFile) {
id := logFile.ID()
assert(id != 0, "cannot parse log file id: %s", logFile.Path())
// Build a logger for this compaction.
logger := i.logger.With(
zap.String("token", generateCompactionToken()),
zap.Int("id", id),
)
// Create new index file.
path := filepath.Join(i.Path, FormatIndexFileName(id, 1))
f, err := os.Create(path)
if err != nil {
log.Printf("tsi1: error creating index file: %s", err)
logger.Error("cannot create index file", zap.Error(err))
return
}
defer f.Close()
// Compact log file to new index file.
lvl := i.levels[1]
if _, err := logFile.CompactTo(f, lvl.M, lvl.K); err != nil {
n, err := logFile.CompactTo(f, lvl.M, lvl.K)
if err != nil {
log.Printf("%s: error compacting log file: path=%s, err=%s", IndexName, logFile.Path(), err)
logger.Error("cannot compact log file", zap.Error(err), zap.String("path", logFile.Path()))
return
}
// Close file.
if err := f.Close(); err != nil {
log.Printf("tsi1: error closing log file: %s", err)
logger.Error("cannot close log file", zap.Error(err))
return
}
@ -999,7 +1029,7 @@ func (i *Index) compactLogFile(logFile *LogFile) {
file := NewIndexFile()
file.SetPath(path)
if err := file.Open(); err != nil {
log.Printf("tsi1: error opening compacted index file: path=%s, err=%s", file.Path(), err)
logger.Error("cannot open compacted index file", zap.Error(err), zap.String("path", file.Path()))
return
}
@ -1018,17 +1048,23 @@ func (i *Index) compactLogFile(logFile *LogFile) {
}
return nil
}(); err != nil {
log.Printf("%s: error updating manifest: %s", IndexName, err)
logger.Error("cannot update manifest", zap.Error(err))
return
}
log.Printf("%s: log file compacted: file=%s, t=%0.03fs", IndexName, filepath.Base(logFile.Path()), time.Since(start).Seconds())
elapsed := time.Since(start)
logger.Error("log file compacted",
zap.String("elapsed", elapsed.String()),
zap.Int64("bytes", n),
zap.Int("kb_per_sec", int(float64(n)/elapsed.Seconds())/1024),
)
// Closing the log file will automatically wait until the ref count is zero.
if err := logFile.Close(); err != nil {
log.Printf("%s: error closing log file: %s", IndexName, err)
logger.Error("cannot close log file", zap.Error(err))
return
} else if err := os.Remove(logFile.Path()); err != nil {
log.Printf("%s: error removing log file: %s", IndexName, err)
logger.Error("cannot remove log file", zap.Error(err))
return
}
@ -1289,3 +1325,11 @@ const MaxIndexMergeCount = 2
// MaxIndexFileSize is the maximum expected size of an index file.
const MaxIndexFileSize = 4 * (1 << 30)
// generateCompactionToken returns a short token to track an individual compaction.
// It is only used for logging so it doesn't need strong uniqueness guarantees.
func generateCompactionToken() string {
token := make([]byte, 3)
rand.Read(token)
return fmt.Sprintf("%x", token)
}

View File

@ -170,6 +170,7 @@ func (s *Shard) WithLogger(log zap.Logger) {
s.baseLogger = log
if err := s.ready(); err == nil {
s.engine.WithLogger(s.baseLogger)
s.index.WithLogger(s.baseLogger)
}
s.logger = s.baseLogger.With(zap.String("service", "shard"))
}
@ -274,6 +275,7 @@ func (s *Shard) Open() error {
return err
}
s.index = idx
idx.WithLogger(s.baseLogger)
// Initialize underlying engine.
e, err := NewEngine(s.id, idx, s.path, s.walPath, s.options)