Update logging calls to take advantage of structured logging

Includes a style guide that details the basics of how to log.
pull/9454/head
Jonathan A. Sternberg 2018-02-15 16:47:08 -05:00
parent 1a3af441cb
commit 2bbd96768d
25 changed files with 493 additions and 165 deletions

View File

@ -96,11 +96,11 @@ func (m *Main) Run(args ...string) error {
cmd.Logger.Info("Waiting for clean shutdown...")
select {
case <-signalCh:
cmd.Logger.Info("second signal received, initializing hard shutdown")
cmd.Logger.Info("Second signal received, initializing hard shutdown")
case <-time.After(time.Second * 30):
cmd.Logger.Info("time limit reached, initializing hard shutdown")
cmd.Logger.Info("Time limit reached, initializing hard shutdown")
case <-cmd.Closed:
cmd.Logger.Info("server shutdown completed")
cmd.Logger.Info("Server shutdown completed")
}
// goodbye.

View File

@ -99,9 +99,13 @@ func (cmd *Command) Run(args ...string) error {
}
// Mark start-up in log.
cmd.Logger.Info(fmt.Sprintf("InfluxDB starting, version %s, branch %s, commit %s",
cmd.Version, cmd.Branch, cmd.Commit))
cmd.Logger.Info(fmt.Sprintf("Go version %s, GOMAXPROCS set to %d", runtime.Version(), runtime.GOMAXPROCS(0)))
cmd.Logger.Info("InfluxDB starting",
zap.String("version", cmd.Version),
zap.String("branch", cmd.Branch),
zap.String("commit", cmd.Commit))
cmd.Logger.Info("Go runtime",
zap.String("version", runtime.Version()),
zap.Int("maxprocs", runtime.GOMAXPROCS(0)))
// If there was an error on startup when creating the logger, output it now.
if logErr != nil {
@ -187,7 +191,7 @@ func (cmd *Command) monitorServerErrors() {
func (cmd *Command) removePIDFile() {
if cmd.pidfile != "" {
if err := os.Remove(cmd.pidfile); err != nil {
cmd.Logger.Error("unable to remove pidfile", zap.Error(err))
cmd.Logger.Error("Unable to remove pidfile", zap.Error(err))
}
}
}
@ -235,11 +239,11 @@ func (cmd *Command) writePIDFile(path string) error {
func (cmd *Command) ParseConfig(path string) (*Config, error) {
// Use demo configuration if no config path is specified.
if path == "" {
cmd.Logger.Info("no configuration provided, using default settings")
cmd.Logger.Info("No configuration provided, using default settings")
return NewDemoConfig()
}
cmd.Logger.Info(fmt.Sprintf("Using configuration at: %s", path))
cmd.Logger.Info("Loading configuration file", zap.String("path", path))
config := NewConfig()
if err := config.FromTomlFile(path); err != nil {

View File

@ -2,7 +2,6 @@ package coordinator
import (
"errors"
"fmt"
"sort"
"sync"
"sync/atomic"
@ -380,7 +379,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
if err == tsdb.ErrShardNotFound {
err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true)
if err != nil {
w.Logger.Info(fmt.Sprintf("write failed for shard %d: %v", shard.ID, err))
w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err))
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
@ -388,7 +387,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
}
err = w.TSDBStore.WriteToShard(shard.ID, points)
if err != nil {
w.Logger.Info(fmt.Sprintf("write failed for shard %d: %v", shard.ID, err))
w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err))
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
}

View File

@ -11,6 +11,8 @@ import (
"go.uber.org/zap/zapcore"
)
const TimeFormat = "2006-01-02T15:04:05.000000Z07:00"
func New(w io.Writer) *zap.Logger {
config := NewConfig()
l, _ := config.New(w)
@ -63,11 +65,13 @@ func newEncoder(format string) (zapcore.Encoder, error) {
func newEncoderConfig() zapcore.EncoderConfig {
config := zap.NewProductionEncoderConfig()
config.EncodeTime = func(ts time.Time, encoder zapcore.PrimitiveArrayEncoder) {
encoder.AppendString(ts.UTC().Format(time.RFC3339))
encoder.AppendString(ts.UTC().Format(TimeFormat))
}
config.EncodeDuration = func(d time.Duration, encoder zapcore.PrimitiveArrayEncoder) {
encoder.AppendString(d.String())
val := float64(d) / float64(time.Millisecond)
encoder.AppendString(fmt.Sprintf("%.3fms", val))
}
config.LevelKey = "lvl"
return config
}
@ -80,3 +84,44 @@ func IsTerminal(w io.Writer) bool {
}
return false
}
const (
year = 365 * 24 * time.Hour
week = 7 * 24 * time.Hour
day = 24 * time.Hour
)
func DurationLiteral(key string, val time.Duration) zapcore.Field {
if val == 0 {
return zap.String(key, "0s")
}
var (
value int
unit string
)
switch {
case val%year == 0:
value = int(val / year)
unit = "y"
case val%week == 0:
value = int(val / week)
unit = "w"
case val%day == 0:
value = int(val / day)
unit = "d"
case val%time.Hour == 0:
value = int(val / time.Hour)
unit = "h"
case val%time.Minute == 0:
value = int(val / time.Minute)
unit = "m"
case val%time.Second == 0:
value = int(val / time.Second)
unit = "s"
default:
value = int(val / time.Millisecond)
unit = "ms"
}
return zap.String(key, fmt.Sprintf("%d%s", value, unit))
}

192
logger/style_guide.md Normal file
View File

@ -0,0 +1,192 @@
# Logging Style Guide
The intention of logging is to give insight to the administrator of how
the server is running and also notify the administrator of any problems
or potential problems with the system.
At the moment, log level filtering is the only option to configure
logging in InfluxDB. Adding a logging message and choosing its level
should be done according to the guidelines in this document for
operational clarity. The available log levels are:
* Error
* Warn
* Info
* Debug
InfluxDB uses structured logging. Structured logging is when you log
messages and attach context to those messages with more easily read data
regarding the state of the system. A structured log message is composed
of:
* Time
* Level
* Message
* (Optionally) Additional context
## Guidelines
**Log messages** should be simple statements or phrases that begin with
a capital letter, but have no punctuation at the end. The message should be a
constant so that every time it is logged it is easily identified and can
be filtered by without regular expressions.
Any **dynamic content** should be expressed by context. The key should
be a constant and the value is the dynamic content.
Do not log messages in tight loops or other high performance locations.
It will likely create a performance problem.
## Naming Conventions
If the log encoding format uses keys for the time, message, or level,
the key names should be `ts` for time, `msg` for the message, and
`lvl` for the level.
If the log encoding format does not use keys for the time, message, or
level and instead outputs them in some other method, this guideline can
be ignored. The output formats logfmt and json both use keys when
encoding these values.
### Context Key Names
The key for the dynamic content in the context should be formatted in
`snake_case`. The key should be completely lower case.
## Levels
As a reminder, levels are usually the only way to configure what is
logged. There are four available logging levels.
* Error
* Warn
* Info
* Debug
It is important to get the right logging level to ensure the log
messages are useful for end users to act on.
In general, when considering which log level to use, you should use
**info**. If you are considering using another level, read the below
expanded descriptions to determine which level your message belongs in.
### Error
The **error** level is intended to communicate that there is a serious
problem with the server. **An error should be emitted only when an
on-call engineer can take some action to remedy the situation _and_ the
system cannot continue operating properly without remedying the
situation.**
An example of what may qualify as an error level message is the creation
of the internal storage for the monitor service. For that system to
function at all, a database must be created. If no database is created,
the service itself cannot function. The error has a clear actionable
solution. Figure out why the database isn't being created and create it.
An example of what does not qualify as an error is failing to parse a
query or a socket closing prematurely. Both of these usually indicate
some kind of user error rather than system error. Both are ephemeral
errors and they would not be clearly actionable to an administrator who
was paged at 3 AM. Both of these are examples of logging messages that
should be emitted at the info level with an error key rather than being
logged at the error level.
Logged errors **must not propagate**. Propagating the error risks
logging it in multiple locations and confusing users when the same error
is reported multiple times. In general, if you are returning an error,
never log at any level. By returning the error, you are telling the
parent function to handle the error. Logging a message at any level is
handling the error.
This logging message should be used very rarely and any messages that
use this logging level should not repeat frequently. Assume that
anything that is logged with error will page someone in the middle of
the night.
### Warn
The **warn** level is intended to communicate that there is likely to be
a serious problem with the server if it not addressed. **A warning
should be emitted only when a support engineer can take some action to
remedy the situation _and_ the system may not continue operating
properly in the near future without remedying the situation.**
An example of what may qualify as a warning is the `max-values-per-tag`
setting. If the server starts to approach the maximum number of values,
the server may stop being able to function properly when it reaches the
maximum number.
An example of what does not qualify as a warning is the
`log-queries-after` setting. While the message is "warning" that a query
was running for a long period of time, it is not clearly actionable and
does not indicate that the server will fail in the near future. This
should be logged at the info level instead.
This logging message should be used very rarely and any messages that
use this logging level should not repeat frequently. Assume that
anything that is logged with warn will page someone in the middle of the
night and potentially ignored until normal working hours.
### Info
The **info** level should be used for almost anything. If you are not
sure which logging level to use, use info. Temporary or user errors
should be logged at the info level and any informational messages for
administrators should be logged at this level. Info level messages
should be safe for an administrator to discard if they really want to,
but most people will run the system at the info level.
### Debug
The **debug** level exists to log messages that are useful only for
debugging a bad running instance.
This level should be rarely used if ever. If you intend to use this
level, please have a rationale ready. Most messages that could be
considered debug either shouldn't exist or should be logged at the info
level. Debug messages will be suppressed by default.
## Value Formatting
Formatting for strings, integers, and other standard values are usually
determined by the log format itself and those will be kept ambiguous.
The following specific formatting choices are for data types that could
be output in multiple ways.
### Time
Time values should be encoded using RFC3339 with microsecond precision.
The size of the string should be normalized to the same number of digits
every time to ensure that it is easier to read the time as a column.
### Duration
Duration values that denote a period of time should be output in
milliseconds with microsecond precision. The microseconds should be in
decimal form with three decimal points. Durations that denote a static
period of time should be output with a single number and a suffix with
the largest possible unit that doesn't cause the value to be a decimal.
There are two types of durations.
* Tracks a (usually small) period of time and is meant for timing how
long something take. The content is dynamic and may be graphed.
* Duration literal where the content is dynamic, is unlikely to be
graphed, and usually comes from some type of configuration.
If the content is dynamic, the duration should be printed as a number of
milliseconds with a decimal indicating the number of microseconds. Any
duration lower than microseconds should be truncated. The decimal section
should always print exactly 3 points after the decimal point.
If the content is static, the duration should be printed with a single
number and a suffix indicating the unit in years (`y`), weeks (`w`),
days (`d`), hours (`h`), minutes (`m`), seconds (`s`), or
milliseconds (`ms`). The suffix should be the greatest unit that can be
used without truncating the value. As an example, if the duration is
60 minutes, then `1h` should be used. If the duration is 61 minutes,
then `61m` should be used.
For anything lower than milliseconds that is static, the duration should
be truncated. A value of zero should be shown as `0s`.

View File

@ -13,6 +13,7 @@ import (
"sync"
"time"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor/diagnostics"
"github.com/influxdata/influxdb/services/meta"
@ -98,7 +99,7 @@ func (m *Monitor) Open() error {
return nil
}
m.Logger.Info("Starting monitor system")
m.Logger.Info("Starting monitor service")
// Self-register various stats and diagnostics.
m.RegisterDiagnosticsClient("build", &build{
@ -149,7 +150,7 @@ func (m *Monitor) writePoints(p models.Points) error {
defer m.mu.RUnlock()
if err := m.PointsWriter.WritePoints(m.storeDatabase, m.storeRetentionPolicy, p); err != nil {
m.Logger.Info(fmt.Sprintf("failed to store statistics: %s", err))
m.Logger.Info("failed to store statistics", zap.Error(err))
}
return nil
}
@ -157,11 +158,11 @@ func (m *Monitor) writePoints(p models.Points) error {
// Close closes the monitor system.
func (m *Monitor) Close() error {
if !m.open() {
m.Logger.Info("Monitor is already closed.")
m.Logger.Info("Monitor is already closed")
return nil
}
m.Logger.Info("shutting down monitor system")
m.Logger.Info("Shutting down monitor service")
m.mu.Lock()
close(m.done)
m.mu.Unlock()
@ -220,7 +221,7 @@ func (m *Monitor) RegisterDiagnosticsClient(name string, client diagnostics.Clie
m.mu.Lock()
defer m.mu.Unlock()
m.diagRegistrations[name] = client
m.Logger.Info("registered for diagnostics monitoring", zap.String("name", name))
m.Logger.Info("Registered diagnostics client", zap.String("name", name))
}
// DeregisterDiagnosticsClient deregisters a diagnostics client by name.
@ -389,8 +390,7 @@ func (m *Monitor) createInternalStorage() {
}
if _, err := m.MetaClient.CreateDatabaseWithRetentionPolicy(m.storeDatabase, &spec); err != nil {
m.Logger.Info(fmt.Sprintf("failed to create database '%s', failed to create storage: %s",
m.storeDatabase, err.Error()))
m.Logger.Info("Failed to create storage", zap.String("db", m.storeDatabase), zap.Error(err))
return
}
}
@ -417,8 +417,7 @@ func (m *Monitor) waitUntilInterval(d time.Duration) error {
// storeStatistics writes the statistics to an InfluxDB system.
func (m *Monitor) storeStatistics() {
defer m.wg.Done()
m.Logger.Info(fmt.Sprintf("Storing statistics in database '%s' retention policy '%s', at interval %s",
m.storeDatabase, m.storeRetentionPolicy, m.storeInterval))
m.Logger.Info("Storing statistics", zap.String("db", m.storeDatabase), zap.String("rp", m.storeRetentionPolicy), logger.DurationLiteral("interval", m.storeInterval))
// Wait until an even interval to start recording monitor statistics.
// If we are interrupted before the interval for some reason, exit early.
@ -441,7 +440,7 @@ func (m *Monitor) storeStatistics() {
stats, err := m.Statistics(m.globalTags)
if err != nil {
m.Logger.Info(fmt.Sprintf("failed to retrieve registered statistics: %s", err))
m.Logger.Info("Failed to retrieve registered statistics", zap.Error(err))
return
}
@ -450,7 +449,7 @@ func (m *Monitor) storeStatistics() {
for _, s := range stats {
pt, err := models.NewPoint(s.Name, models.NewTags(s.Tags), s.Values, now)
if err != nil {
m.Logger.Info(fmt.Sprintf("Dropping point %v: %v", s.Name, err))
m.Logger.Info("Dropping point", zap.String("name", s.Name), zap.Error(err))
return
}
batch = append(batch, pt)
@ -466,7 +465,7 @@ func (m *Monitor) storeStatistics() {
m.writePoints(batch)
}
case <-m.done:
m.Logger.Info(fmt.Sprintf("terminating storage of statistics"))
m.Logger.Info("Terminating storage of statistics")
return
}
}

View File

@ -61,7 +61,7 @@ func TestMonitor_SetPointsWriter_StoreEnabled(t *testing.T) {
defer s.Close()
// Verify that the monitor was opened by looking at the log messages.
if logs.FilterMessage("Starting monitor system").Len() == 0 {
if logs.FilterMessage("Starting monitor service").Len() == 0 {
t.Errorf("monitor system was never started")
}
}

View File

@ -123,7 +123,8 @@ func (s *Service) Open() error {
readdir = func(path string) {
files, err := ioutil.ReadDir(path)
if err != nil {
s.Logger.Info(fmt.Sprintf("Unable to read directory %s: %s", path, err))
s.Logger.Info("Unable to read directory",
zap.String("path", path), zap.Error(err))
return
}
@ -134,10 +135,10 @@ func (s *Service) Open() error {
continue
}
s.Logger.Info(fmt.Sprintf("Loading %s", fullpath))
s.Logger.Info("Loading types from file", zap.String("path", fullpath))
types, err := TypesDBFile(fullpath)
if err != nil {
s.Logger.Info(fmt.Sprintf("Unable to parse collectd types file: %s", f.Name()))
s.Logger.Info("Unable to parse collectd types file", zap.String("path", f.Name()))
continue
}
@ -147,7 +148,7 @@ func (s *Service) Open() error {
readdir(s.Config.TypesDB)
s.popts.TypesDB = alltypesdb
} else {
s.Logger.Info(fmt.Sprintf("Loading %s", s.Config.TypesDB))
s.Logger.Info("Loading types from file", zap.String("path", s.Config.TypesDB))
types, err := TypesDBFile(s.Config.TypesDB)
if err != nil {
return fmt.Errorf("Open(): %s", err)
@ -194,7 +195,7 @@ func (s *Service) Open() error {
}
s.conn = conn
s.Logger.Info(fmt.Sprint("Listening on UDP: ", conn.LocalAddr().String()))
s.Logger.Info("Listening on UDP", zap.Stringer("addr", conn.LocalAddr()))
// Start the points batcher.
s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchPending, time.Duration(s.Config.BatchDuration))
@ -241,7 +242,7 @@ func (s *Service) Close() error {
s.conn = nil
s.batcher = nil
s.Logger.Info("collectd UDP closed")
s.Logger.Info("Closed collectd service")
s.done = nil
return nil
}
@ -345,8 +346,16 @@ func (s *Service) serve() {
n, _, err := s.conn.ReadFromUDP(buffer)
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
select {
case <-s.done:
return
default:
// The socket wasn't closed by us so consider it an error.
}
}
atomic.AddInt64(&s.stats.ReadFail, 1)
s.Logger.Info(fmt.Sprintf("collectd ReadFromUDP error: %s", err))
s.Logger.Info("ReadFromUDP error", zap.Error(err))
continue
}
if n > 0 {
@ -360,7 +369,7 @@ func (s *Service) handleMessage(buffer []byte) {
valueLists, err := network.Parse(buffer, s.popts)
if err != nil {
atomic.AddInt64(&s.stats.PointsParseFail, 1)
s.Logger.Info(fmt.Sprintf("Collectd parse error: %s", err))
s.Logger.Info("collectd parse error", zap.Error(err))
return
}
var points []models.Point
@ -385,7 +394,8 @@ func (s *Service) writePoints() {
case batch := <-s.batcher.Out():
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.Logger.Info(fmt.Sprintf("Required database %s not yet created: %s", s.Config.Database, err.Error()))
s.Logger.Info("Required database not yet created",
zap.String("db", s.Config.Database), zap.Error(err))
continue
}
@ -393,7 +403,8 @@ func (s *Service) writePoints() {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {
s.Logger.Info(fmt.Sprintf("failed to write point batch to database %q: %s", s.Config.Database, err))
s.Logger.Info("Failed to write point batch to database",
zap.String("db", s.Config.Database), zap.Error(err))
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
}
}
@ -439,7 +450,7 @@ func (s *Service) UnmarshalValueListPacked(vl *api.ValueList) []models.Point {
// Drop invalid points
p, err := models.NewPoint(name, models.NewTags(tags), fields, timestamp)
if err != nil {
s.Logger.Info(fmt.Sprintf("Dropping point %v: %v", name, err))
s.Logger.Info("Dropping point", zap.String("name", name), zap.Error(err))
atomic.AddInt64(&s.stats.InvalidDroppedPoints, 1)
return nil
}
@ -483,7 +494,7 @@ func (s *Service) UnmarshalValueList(vl *api.ValueList) []models.Point {
// Drop invalid points
p, err := models.NewPoint(name, models.NewTags(tags), fields, timestamp)
if err != nil {
s.Logger.Info(fmt.Sprintf("Dropping point %v: %v", name, err))
s.Logger.Info("Dropping point", zap.String("name", name), zap.Error(err))
atomic.AddInt64(&s.stats.InvalidDroppedPoints, 1)
continue
}

View File

@ -9,6 +9,7 @@ import (
"sync/atomic"
"time"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/meta"
@ -213,14 +214,14 @@ func (s *Service) backgroundLoop() {
for {
select {
case <-s.stop:
s.Logger.Info("continuous query service terminating")
s.Logger.Info("Terminating continuous query service")
return
case req := <-s.RunCh:
if !s.hasContinuousQueries() {
continue
}
if _, err := s.MetaClient.AcquireLease(leaseName); err == nil {
s.Logger.Info(fmt.Sprintf("running continuous queries by request for time: %v", req.Now))
s.Logger.Info("Running continuous queries by request", zap.Time("at", req.Now))
s.runContinuousQueries(req)
}
case <-t.C:
@ -261,7 +262,7 @@ func (s *Service) runContinuousQueries(req *RunRequest) {
continue
}
if ok, err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil {
s.Logger.Info(fmt.Sprintf("error executing query: %s: err = %s", cq.Query, err))
s.Logger.Info("Error executing query", zap.String("query", cq.Query), zap.Error(err))
atomic.AddInt64(&s.stats.QueryFail, 1)
} else if ok {
atomic.AddInt64(&s.stats.QueryOK, 1)
@ -356,8 +357,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
}
if err := cq.q.SetTimeRange(startTime, endTime); err != nil {
s.Logger.Info(fmt.Sprintf("error setting time range: %s", err))
return false, err
return false, fmt.Errorf("unable to set time range: %s", err)
}
var start time.Time
@ -366,13 +366,15 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
}
if s.loggingEnabled {
s.Logger.Info(fmt.Sprintf("executing continuous query %s (%v to %v)", cq.Info.Name, startTime, endTime))
s.Logger.Info("Executing continuous query",
zap.String("name", cq.Info.Name),
zap.Time("start", startTime),
zap.Time("end", endTime))
}
// Do the actual processing of the query & writing of results.
res := s.runContinuousQueryAndWriteResult(cq)
if res.Err != nil {
s.Logger.Info(fmt.Sprintf("error: %s. running: %s", res.Err, cq.q.String()))
return false, res.Err
}
@ -389,7 +391,12 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
}
if s.loggingEnabled {
s.Logger.Info(fmt.Sprintf("finished continuous query %s, %d points(s) written (%v to %v) in %s", cq.Info.Name, written, startTime, endTime, execDuration))
s.Logger.Info("Finished continuous query",
zap.String("name", cq.Info.Name),
zap.Int64("written", written),
zap.Time("start", startTime),
zap.Time("end", endTime),
logger.DurationLiteral("duration", execDuration))
}
if s.queryStatsEnabled && s.Monitor.Enabled() {

View File

@ -11,6 +11,7 @@ import (
"sync/atomic"
"time"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor/diagnostics"
"github.com/influxdata/influxdb/services/meta"
@ -133,7 +134,9 @@ func (s *Service) Open() error {
}
s.done = make(chan struct{})
s.logger.Info(fmt.Sprintf("Starting graphite service, batch size %d, batch timeout %s", s.batchSize, s.batchTimeout))
s.logger.Info("Starting graphite service",
zap.Int("batch_size", s.batchSize),
logger.DurationLiteral("batch_timeout", s.batchTimeout))
// Register diagnostics if a Monitor service is available.
if s.Monitor != nil {
@ -159,9 +162,12 @@ func (s *Service) Open() error {
return err
}
s.logger.Info(fmt.Sprintf("Listening on %s: %s", strings.ToUpper(s.protocol), s.addr.String()))
s.logger.Info("Listening",
zap.String("protocol", s.protocol),
zap.Stringer("addr", s.addr))
return nil
}
func (s *Service) closeAllConnections() {
s.tcpConnectionsMu.Lock()
defer s.tcpConnectionsMu.Unlock()
@ -317,11 +323,11 @@ func (s *Service) openTCPServer() (net.Addr, error) {
for {
conn, err := s.ln.Accept()
if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
s.logger.Info("graphite TCP listener closed")
s.logger.Info("Graphite TCP listener closed")
return
}
if err != nil {
s.logger.Info("error accepting TCP connection", zap.Error(err))
s.logger.Info("Error accepting TCP connection", zap.Error(err))
continue
}
@ -432,7 +438,7 @@ func (s *Service) handleLine(line string) {
return
}
}
s.logger.Info(fmt.Sprintf("unable to parse line: %s: %s", line, err))
s.logger.Info("Unable to parse line", zap.String("line", line), zap.Error(err))
atomic.AddInt64(&s.stats.PointsParseFail, 1)
return
}
@ -448,7 +454,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
case batch := <-batcher.Out():
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.logger.Info(fmt.Sprintf("Required database or retention policy do not yet exist: %s", err.Error()))
s.logger.Info("Required database or retention policy do not yet exist", zap.Error(err))
continue
}
@ -456,7 +462,8 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {
s.logger.Info(fmt.Sprintf("failed to write point batch to database %q: %s", s.database, err))
s.logger.Info("Failed to write point batch to database",
zap.String("db", s.database), zap.Error(err))
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
}

View File

@ -392,7 +392,10 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U
if h.Config.AuthEnabled {
if err := h.QueryAuthorizer.AuthorizeQuery(user, q, db); err != nil {
if err, ok := err.(meta.ErrAuthorize); ok {
h.Logger.Info(fmt.Sprintf("Unauthorized request | user: %q | query: %q | database %q", err.User, err.Query.String(), err.Database))
h.Logger.Info("Unauthorized request",
zap.String("user", err.User),
zap.Stringer("query", err.Query),
zap.String("db", err.Database))
}
h.httpError(rw, "error authorizing query: "+err.Error(), http.StatusForbidden)
return
@ -598,7 +601,9 @@ func (h *Handler) async(q *influxql.Query, results <-chan *query.Result) {
if r.Err == query.ErrNotExecuted {
continue
}
h.Logger.Info(fmt.Sprintf("error while running async query: %s: %s", q, r.Err))
h.Logger.Info("Error while running async query",
zap.Stringer("query", q),
zap.Error(r.Err))
}
}
}
@ -681,7 +686,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U
atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len()))
if h.Config.WriteTracing {
h.Logger.Info(fmt.Sprintf("Write body received by handler: %s", buf.Bytes()))
h.Logger.Info("Write body received by handler", zap.ByteString("body", buf.Bytes()))
}
points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision"))
@ -851,7 +856,7 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me
atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len()))
if h.Config.WriteTracing {
h.Logger.Info(fmt.Sprintf("Prom write body received by handler: %s", buf.Bytes()))
h.Logger.Info("Prom write body received by handler", zap.ByteString("body", buf.Bytes()))
}
reqBuf, err := snappy.Decode(nil, buf.Bytes())
@ -870,7 +875,7 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me
points, err := prometheus.WriteRequestToPoints(&req)
if err != nil {
if h.Config.WriteTracing {
h.Logger.Info(fmt.Sprintf("Prom write handler: %s", err.Error()))
h.Logger.Info("Prom write handler", zap.Error(err))
}
if err != prometheus.ErrNaNDropped {
@ -947,7 +952,10 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
if h.Config.AuthEnabled {
if err := h.QueryAuthorizer.AuthorizeQuery(user, q, db); err != nil {
if err, ok := err.(meta.ErrAuthorize); ok {
h.Logger.Info(fmt.Sprintf("Unauthorized request | user: %q | query: %q | database %q", err.User, err.Query.String(), err.Database))
h.Logger.Info("Unauthorized request",
zap.String("user", err.User),
zap.Stringer("query", err.Query),
zap.String("db", err.Database))
}
h.httpError(w, "error authorizing query: "+err.Error(), http.StatusForbidden)
return

View File

@ -86,8 +86,7 @@ func NewService(c Config) *Service {
// Open starts the service.
func (s *Service) Open() error {
s.Logger.Info("Starting HTTP service")
s.Logger.Info(fmt.Sprint("Authentication enabled:", s.Handler.Config.AuthEnabled))
s.Logger.Info("Starting HTTP service", zap.Bool("authentication", s.Handler.Config.AuthEnabled))
// Open listener.
if s.https {
@ -103,7 +102,6 @@ func (s *Service) Open() error {
return err
}
s.Logger.Info(fmt.Sprint("Listening on HTTPS:", listener.Addr().String()))
s.ln = listener
} else {
listener, err := net.Listen("tcp", s.addr)
@ -111,9 +109,11 @@ func (s *Service) Open() error {
return err
}
s.Logger.Info(fmt.Sprint("Listening on HTTP:", listener.Addr().String()))
s.ln = listener
}
s.Logger.Info("Listening on HTTP",
zap.Stringer("addr", s.ln.Addr()),
zap.Bool("https", s.https))
// Open unix socket listener.
if s.unixSocket {
@ -132,7 +132,8 @@ func (s *Service) Open() error {
return err
}
s.Logger.Info(fmt.Sprint("Listening on unix socket:", listener.Addr().String()))
s.Logger.Info("Listening on unix socket",
zap.Stringer("addr", listener.Addr()))
s.unixSocketListener = listener
go s.serveUnixSocket()

View File

@ -7,7 +7,6 @@ import (
crand "crypto/rand"
"crypto/sha256"
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
@ -800,16 +799,23 @@ func (c *Client) PrecreateShardGroups(from, to time.Time) error {
nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)
// if it already exists, continue
if sg, _ := data.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); sg != nil {
c.logger.Info(fmt.Sprintf("shard group %d exists for database %s, retention policy %s", sg.ID, di.Name, rp.Name))
c.logger.Info("Shard group already exists",
zap.Uint64("id", sg.ID),
zap.String("db", di.Name),
zap.String("rp", rp.Name))
continue
}
newGroup, err := createShardGroup(data, di.Name, rp.Name, nextShardGroupTime)
if err != nil {
c.logger.Info(fmt.Sprintf("failed to precreate successive shard group for group %d: %s", g.ID, err.Error()))
c.logger.Info("Failed to precreate successive shard group",
zap.Uint64("group_id", g.ID), zap.Error(err))
continue
}
changed = true
c.logger.Info(fmt.Sprintf("new shard group %d successfully precreated for database %s, retention policy %s", newGroup.ID, di.Name, rp.Name))
c.logger.Info("New shard group successfully precreated",
zap.Uint64("group_id", newGroup.ID),
zap.String("db", di.Name),
zap.String("rp", rp.Name))
}
}
}

View File

@ -5,7 +5,6 @@ import (
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
@ -116,7 +115,7 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) {
pt, err := models.NewPoint(p.Metric, models.NewTags(p.Tags), map[string]interface{}{"value": p.Value}, ts)
if err != nil {
h.Logger.Info(fmt.Sprintf("Dropping point %v: %v", p.Metric, err))
h.Logger.Info("Dropping point", zap.String("name", p.Metric), zap.Error(err))
if h.stats != nil {
atomic.AddInt64(&h.stats.InvalidDroppedPoints, 1)
}
@ -127,11 +126,11 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) {
// Write points.
if err := h.PointsWriter.WritePointsPrivileged(h.Database, h.RetentionPolicy, models.ConsistencyLevelAny, points); influxdb.IsClientError(err) {
h.Logger.Info(fmt.Sprint("write series error: ", err))
h.Logger.Info("Write series error", zap.Error(err))
http.Error(w, "write series error: "+err.Error(), http.StatusBadRequest)
return
} else if err != nil {
h.Logger.Info(fmt.Sprint("write series error: ", err))
h.Logger.Info("Write series error", zap.Error(err))
http.Error(w, "write series error: "+err.Error(), http.StatusInternalServerError)
return
}

View File

@ -5,7 +5,6 @@ import (
"bufio"
"bytes"
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
@ -134,7 +133,6 @@ func (s *Service) Open() error {
return err
}
s.Logger.Info(fmt.Sprint("Listening on TLS: ", listener.Addr().String()))
s.ln = listener
} else {
listener, err := net.Listen("tcp", s.BindAddress)
@ -142,9 +140,11 @@ func (s *Service) Open() error {
return err
}
s.Logger.Info(fmt.Sprint("Listening on: ", listener.Addr().String()))
s.ln = listener
}
s.Logger.Info("Listening on TCP",
zap.Stringer("addr", s.ln.Addr()),
zap.Bool("tls", s.tls))
s.httpln = newChanListener(s.ln.Addr())
// Begin listening for connections.
@ -294,10 +294,10 @@ func (s *Service) serve() {
// Wait for next connection.
conn, err := s.ln.Accept()
if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
s.Logger.Info("openTSDB TCP listener closed")
s.Logger.Info("OpenTSDB TCP listener closed")
return
} else if err != nil {
s.Logger.Info(fmt.Sprint("error accepting openTSDB: ", err.Error()))
s.Logger.Info("Error accepting OpenTSDB", zap.Error(err))
continue
}
@ -355,7 +355,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
if err != nil {
if err != io.EOF {
atomic.AddInt64(&s.stats.TelnetReadError, 1)
s.Logger.Info(fmt.Sprint("error reading from openTSDB connection ", err.Error()))
s.Logger.Info("Error reading from OpenTSDB connection", zap.Error(err))
}
return
}
@ -372,7 +372,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
if len(inputStrs) < 4 || inputStrs[0] != "put" {
atomic.AddInt64(&s.stats.TelnetBadLine, 1)
if s.LogPointErrors {
s.Logger.Info(fmt.Sprintf("malformed line '%s' from %s", line, remoteAddr))
s.Logger.Info("Malformed line", zap.String("line", line), zap.String("remote_addr", remoteAddr))
}
continue
}
@ -387,7 +387,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
if err != nil {
atomic.AddInt64(&s.stats.TelnetBadTime, 1)
if s.LogPointErrors {
s.Logger.Info(fmt.Sprintf("malformed time '%s' from %s", tsStr, remoteAddr))
s.Logger.Info("Malformed time", zap.String("time", tsStr), zap.String("remote_addr", remoteAddr))
}
}
@ -399,7 +399,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
default:
atomic.AddInt64(&s.stats.TelnetBadTime, 1)
if s.LogPointErrors {
s.Logger.Info(fmt.Sprintf("bad time '%s' must be 10 or 13 chars, from %s ", tsStr, remoteAddr))
s.Logger.Info("Time must be 10 or 13 chars", zap.String("time", tsStr), zap.String("remote_addr", remoteAddr))
}
continue
}
@ -410,7 +410,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
atomic.AddInt64(&s.stats.TelnetBadTag, 1)
if s.LogPointErrors {
s.Logger.Info(fmt.Sprintf("malformed tag data '%v' from %s", tagStrs[t], remoteAddr))
s.Logger.Info("Malformed tag data", zap.String("tag", tagStrs[t]), zap.String("remote_addr", remoteAddr))
}
continue
}
@ -424,7 +424,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
if err != nil {
atomic.AddInt64(&s.stats.TelnetBadFloat, 1)
if s.LogPointErrors {
s.Logger.Info(fmt.Sprintf("bad float '%s' from %s", valueStr, remoteAddr))
s.Logger.Info("Bad float", zap.String("value", valueStr), zap.String("remote_addr", remoteAddr))
}
continue
}
@ -434,7 +434,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
if err != nil {
atomic.AddInt64(&s.stats.TelnetBadFloat, 1)
if s.LogPointErrors {
s.Logger.Info(fmt.Sprintf("bad float '%s' from %s", valueStr, remoteAddr))
s.Logger.Info("Bad float", zap.String("value", valueStr), zap.String("remote_addr", remoteAddr))
}
continue
}
@ -464,7 +464,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
case batch := <-batcher.Out():
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.Logger.Info(fmt.Sprintf("Required database %s does not yet exist: %s", s.Database, err.Error()))
s.Logger.Info("Required database does not yet exist", zap.String("db", s.Database), zap.Error(err))
continue
}
@ -472,7 +472,8 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {
s.Logger.Info(fmt.Sprintf("failed to write point batch to database %q: %s", s.Database, err))
s.Logger.Info("Failed to write point batch to database",
zap.String("db", s.Database), zap.Error(err))
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
}
}

View File

@ -2,10 +2,10 @@
package precreator // import "github.com/influxdata/influxdb/services/precreator"
import (
"fmt"
"sync"
"time"
"github.com/influxdata/influxdb/logger"
"go.uber.org/zap"
)
@ -44,8 +44,9 @@ func (s *Service) Open() error {
return nil
}
s.Logger.Info(fmt.Sprintf("Starting precreation service with check interval of %s, advance period of %s",
s.checkInterval, s.advancePeriod))
s.Logger.Info("Starting precreation service",
logger.DurationLiteral("check_interval", s.checkInterval),
logger.DurationLiteral("advance_period", s.advancePeriod))
s.done = make(chan struct{})
@ -75,10 +76,10 @@ func (s *Service) runPrecreation() {
select {
case <-time.After(s.checkInterval):
if err := s.precreate(time.Now().UTC()); err != nil {
s.Logger.Info(fmt.Sprintf("failed to precreate shards: %s", err.Error()))
s.Logger.Info("Failed to precreate shards", zap.Error(err))
}
case <-s.done:
s.Logger.Info("Precreation service terminating")
s.Logger.Info("Terminating precreation service")
return
}
}

View File

@ -2,10 +2,10 @@
package retention // import "github.com/influxdata/influxdb/services/retention"
import (
"fmt"
"sync"
"time"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/services/meta"
"go.uber.org/zap"
)
@ -43,7 +43,8 @@ func (s *Service) Open() error {
return nil
}
s.logger.Info("Starting retention policy enforcement service", zap.String("check-interval", s.config.CheckInterval.String()))
s.logger.Info("Starting retention policy enforcement service",
logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval)))
s.done = make(chan struct{})
s.wg.Add(1)
@ -57,7 +58,7 @@ func (s *Service) Close() error {
return nil
}
s.logger.Info("Retention policy enforcement service closing.")
s.logger.Info("Closing retention policy enforcement service")
close(s.done)
s.wg.Wait()
@ -79,7 +80,7 @@ func (s *Service) run() {
return
case <-ticker.C:
s.logger.Info("Retention policy shard deletion check commencing.")
s.logger.Info("Shard deletion check commencing")
type deletionInfo struct {
db string
@ -87,16 +88,29 @@ func (s *Service) run() {
}
deletedShardIDs := make(map[uint64]deletionInfo)
// Mark down if an error occurred during this function so we can inform the
// user that we will try again on the next interval.
// Without the message, they may see the error message and assume they
// have to do it manually.
var retryNeeded bool
dbs := s.MetaClient.Databases()
for _, d := range dbs {
for _, r := range d.RetentionPolicies {
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
s.logger.Info(fmt.Sprintf("Failed to delete shard group %d from database %s, retention policy %s: %v. Retry in %v.", g.ID, d.Name, r.Name, err, s.config.CheckInterval))
s.logger.Info("Failed to delete shard group",
zap.Uint64("id", g.ID),
zap.String("db", d.Name),
zap.String("rp", r.Name),
zap.Error(err))
retryNeeded = true
continue
}
s.logger.Info(fmt.Sprintf("Deleted shard group %d from database %s, retention policy %s.", g.ID, d.Name, r.Name))
s.logger.Info("Deleted shard group",
zap.Uint64("id", g.ID),
zap.String("db", d.Name),
zap.String("rp", r.Name))
// Store all the shard IDs that may possibly need to be removed locally.
for _, sh := range g.Shards {
@ -110,15 +124,28 @@ func (s *Service) run() {
for _, id := range s.TSDBStore.ShardIDs() {
if info, ok := deletedShardIDs[id]; ok {
if err := s.TSDBStore.DeleteShard(id); err != nil {
s.logger.Error(fmt.Sprintf("Failed to delete shard ID %d from database %s, retention policy %s: %v. Will retry in %v", id, info.db, info.rp, err, s.config.CheckInterval))
s.logger.Info("Failed to delete shard",
zap.Uint64("id", id),
zap.String("db", info.db),
zap.String("rp", info.rp),
zap.Error(err))
retryNeeded = true
continue
}
s.logger.Info(fmt.Sprintf("Shard ID %d from database %s, retention policy %s, deleted.", id, info.db, info.rp))
s.logger.Info("Deleted shard",
zap.Uint64("id", id),
zap.String("db", info.db),
zap.String("rp", info.rp))
}
}
if err := s.MetaClient.PruneShardGroups(); err != nil {
s.logger.Info(fmt.Sprintf("Problem pruning shard groups: %s. Will retry in %v", err, s.config.CheckInterval))
s.logger.Info("Problem pruning shard groups", zap.Error(err))
retryNeeded = true
}
if retryNeeded {
s.logger.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval)))
}
}
}

View File

@ -93,10 +93,10 @@ func (s *Service) serve() {
// Wait for next connection.
conn, err := s.Listener.Accept()
if err != nil && strings.Contains(err.Error(), "connection closed") {
s.Logger.Info("snapshot listener closed")
s.Logger.Info("Listener closed")
return
} else if err != nil {
s.Logger.Info(fmt.Sprint("error accepting snapshot request: ", err.Error()))
s.Logger.Info("Error accepting snapshot request", zap.Error(err))
continue
}

View File

@ -101,7 +101,7 @@ func (s *Service) Open() error {
s.waitForMetaUpdates()
}()
s.Logger.Info("opened service")
s.Logger.Info("Opened service")
return nil
}
@ -121,7 +121,7 @@ func (s *Service) Close() error {
close(s.closing)
s.wg.Wait()
s.Logger.Info("closed service")
s.Logger.Info("Closed service")
return nil
}
@ -165,7 +165,7 @@ func (s *Service) waitForMetaUpdates() {
case <-ch:
err := s.Update()
if err != nil {
s.Logger.Info(fmt.Sprint("error updating subscriptions: ", err))
s.Logger.Info("Error updating subscriptions", zap.Error(err))
}
case <-s.closing:
return
@ -296,7 +296,7 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) {
sub, err := s.createSubscription(se, si.Mode, si.Destinations)
if err != nil {
atomic.AddInt64(&s.stats.CreateFailures, 1)
s.Logger.Info(fmt.Sprintf("Subscription creation failed for '%s' with error: %s", si.Name, err))
s.Logger.Info("Subscription creation failed", zap.String("name", si.Name), zap.Error(err))
continue
}
cw := chanWriter{
@ -314,7 +314,9 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) {
}()
}
s.subs[se] = cw
s.Logger.Info(fmt.Sprintf("added new subscription for %s %s", se.db, se.rp))
s.Logger.Info("Added new subscription",
zap.String("db", se.db),
zap.String("rp", se.rp))
}
}
}
@ -327,7 +329,9 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) {
// Remove it from the set
delete(s.subs, se)
s.Logger.Info(fmt.Sprintf("deleted old subscription for %s %s", se.db, se.rp))
s.Logger.Info("Deleted old subscription",
zap.String("db", se.db),
zap.String("rp", se.rp))
}
}
}
@ -341,7 +345,7 @@ func (s *Service) newPointsWriter(u url.URL) (PointsWriter, error) {
return NewHTTP(u.String(), time.Duration(s.conf.HTTPTimeout))
case "https":
if s.conf.InsecureSkipVerify {
s.Logger.Info("WARNING: 'insecure-skip-verify' is true. This will skip all certificate verifications.")
s.Logger.Warn("'insecure-skip-verify' is true. This will skip all certificate verifications.")
}
return NewHTTPS(u.String(), time.Duration(s.conf.HTTPTimeout), s.conf.InsecureSkipVerify, s.conf.CaCerts)
default:

View File

@ -3,7 +3,6 @@ package udp // import "github.com/influxdata/influxdb/services/udp"
import (
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
@ -92,28 +91,30 @@ func (s *Service) Open() (err error) {
s.addr, err = net.ResolveUDPAddr("udp", s.config.BindAddress)
if err != nil {
s.Logger.Info(fmt.Sprintf("Failed to resolve UDP address %s: %s", s.config.BindAddress, err))
s.Logger.Info("Failed to resolve UDP address",
zap.String("bind_address", s.config.BindAddress), zap.Error(err))
return err
}
s.conn, err = net.ListenUDP("udp", s.addr)
if err != nil {
s.Logger.Info(fmt.Sprintf("Failed to set up UDP listener at address %s: %s", s.addr, err))
s.Logger.Info("Failed to set up UDP listener",
zap.Stringer("addr", s.addr), zap.Error(err))
return err
}
if s.config.ReadBuffer != 0 {
err = s.conn.SetReadBuffer(s.config.ReadBuffer)
if err != nil {
s.Logger.Info(fmt.Sprintf("Failed to set UDP read buffer to %d: %s",
s.config.ReadBuffer, err))
s.Logger.Info("Failed to set UDP read buffer",
zap.Int("buffer_size", s.config.ReadBuffer), zap.Error(err))
return err
}
}
s.batcher = tsdb.NewPointBatcher(s.config.BatchSize, s.config.BatchPending, time.Duration(s.config.BatchTimeout))
s.batcher.Start()
s.Logger.Info(fmt.Sprintf("Started listening on UDP: %s", s.config.BindAddress))
s.Logger.Info("Started listening on UDP", zap.String("addr", s.config.BindAddress))
s.wg.Add(3)
go s.serve()
@ -159,7 +160,8 @@ func (s *Service) writer() {
case batch := <-s.batcher.Out():
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.Logger.Info(fmt.Sprintf("Required database %s does not yet exist: %s", s.config.Database, err.Error()))
s.Logger.Info("Required database does not yet exist",
zap.String("db", s.config.Database), zap.Error(err))
continue
}
@ -167,7 +169,8 @@ func (s *Service) writer() {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {
s.Logger.Info(fmt.Sprintf("failed to write point batch to database %q: %s", s.config.Database, err))
s.Logger.Info("Failed to write point batch to database",
zap.String("db", s.config.Database), zap.Error(err))
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
}
@ -191,7 +194,7 @@ func (s *Service) serve() {
n, _, err := s.conn.ReadFromUDP(buf)
if err != nil {
atomic.AddInt64(&s.stats.ReadFail, 1)
s.Logger.Info(fmt.Sprintf("Failed to read UDP message: %s", err))
s.Logger.Info("Failed to read UDP message", zap.Error(err))
continue
}
atomic.AddInt64(&s.stats.BytesReceived, int64(n))
@ -214,7 +217,7 @@ func (s *Service) parser() {
points, err := models.ParsePointsWithPrecision(buf, time.Now().UTC(), s.config.Precision)
if err != nil {
atomic.AddInt64(&s.stats.PointsParseFail, 1)
s.Logger.Info(fmt.Sprintf("Failed to parse points: %s", err))
s.Logger.Info("Failed to parse points", zap.Error(err))
continue
}

View File

@ -680,7 +680,7 @@ func (cl *CacheLoader) Load(cache *Cache) error {
if err != nil {
return err
}
cl.Logger.Info(fmt.Sprintf("reading file %s, size %d", f.Name(), stat.Size()))
cl.Logger.Info("Reading file", zap.String("path", f.Name()), zap.Int64("size", stat.Size()))
// Nothing to read, skip it
if stat.Size() == 0 {
@ -698,7 +698,7 @@ func (cl *CacheLoader) Load(cache *Cache) error {
entry, err := r.Read()
if err != nil {
n := r.Count()
cl.Logger.Info(fmt.Sprintf("file %s corrupt at position %d, truncating", f.Name(), n))
cl.Logger.Info("File corrupt", zap.String("path", f.Name()), zap.Int64("pos", n))
if err := f.Truncate(n); err != nil {
return err
}

View File

@ -742,7 +742,8 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
if err := e.Cache.ApplyEntryFn(func(key []byte, entry *entry) error {
fieldType, err := entry.values.InfluxQLType()
if err != nil {
e.logger.Info(fmt.Sprintf("error getting the data type of values for key %s: %s", key, err.Error()))
e.logger.Info("Error getting the data type of values for key",
zap.ByteString("key", key), zap.Error(err))
}
if err := e.addToIndexFromKey(key, fieldType); err != nil {
@ -758,7 +759,8 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
return err
}
e.traceLogger.Info(fmt.Sprintf("Meta data index for shard %d loaded in %v", shardID, time.Since(now)))
e.traceLogger.Info("Meta data index for shard loaded",
zap.Uint64("id", shardID), zap.Duration("duration", time.Since(now)))
return nil
}
@ -1561,7 +1563,9 @@ func (e *Engine) WriteSnapshot() error {
defer func() {
if started != nil {
e.Cache.UpdateCompactTime(time.Since(*started))
e.logger.Info(fmt.Sprintf("Snapshot for path %s written in %v", e.path, time.Since(*started)))
e.logger.Info("Snapshot for path written",
zap.String("path", e.path),
zap.Duration("duration", time.Since(*started)))
}
}()
@ -1603,7 +1607,9 @@ func (e *Engine) WriteSnapshot() error {
// holding the engine write lock.
dedup := time.Now()
snapshot.Deduplicate()
e.traceLogger.Info(fmt.Sprintf("Snapshot for path %s deduplicated in %v", e.path, time.Since(dedup)))
e.traceLogger.Info("Snapshot for path deduplicated",
zap.String("path", e.path),
zap.Duration("duration", time.Since(dedup)))
return e.writeSnapshotAndCommit(closedFiles, snapshot)
}
@ -1637,7 +1643,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (
// write the new snapshot files
newFiles, err := e.Compactor.WriteSnapshot(snapshot)
if err != nil {
e.logger.Info(fmt.Sprintf("error writing snapshot from compactor: %v", err))
e.logger.Info("Error writing snapshot from compactor", zap.Error(err))
return err
}
@ -1646,7 +1652,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (
// update the file store with these new files
if err := e.FileStore.Replace(nil, newFiles); err != nil {
e.logger.Info(fmt.Sprintf("error adding new TSM files from snapshot: %v", err))
e.logger.Info("Error adding new TSM files from snapshot", zap.Error(err))
return err
}
@ -1654,7 +1660,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (
e.Cache.ClearSnapshot(true)
if err := e.WAL.Remove(closedFiles); err != nil {
e.logger.Info(fmt.Sprintf("error removing closed wal segments: %v", err))
e.logger.Info("Error removing closed WAL segments", zap.Error(err))
}
return nil
@ -1677,10 +1683,10 @@ func (e *Engine) compactCache() {
e.Cache.UpdateAge()
if e.ShouldCompactCache(e.WAL.LastWriteTime()) {
start := time.Now()
e.traceLogger.Info(fmt.Sprintf("Compacting cache for %s", e.path))
e.traceLogger.Info("Compacting cache", zap.String("path", e.path))
err := e.WriteSnapshot()
if err != nil && err != errCompactionsDisabled {
e.logger.Info(fmt.Sprintf("error writing snapshot: %v", err))
e.logger.Info("Error writing snapshot", zap.Error(err))
atomic.AddInt64(&e.stats.CacheCompactionErrors, 1)
} else {
atomic.AddInt64(&e.stats.CacheCompactions, 1)
@ -1881,9 +1887,8 @@ func (e *Engine) compactFull(grp CompactionGroup, wg *sync.WaitGroup) bool {
type compactionStrategy struct {
group CompactionGroup
fast bool
description string
level int
fast bool
level int
durationStat *int64
activeStat *int64
@ -1908,9 +1913,9 @@ func (s *compactionStrategy) Apply() {
func (s *compactionStrategy) compactGroup() {
group := s.group
start := time.Now()
s.logger.Info(fmt.Sprintf("beginning %s compaction, %d TSM files", s.description, len(group)))
s.logger.Info("Beginning compaction", zap.Int("files", len(group)))
for i, f := range group {
s.logger.Info(fmt.Sprintf("compacting %s %s (#%d)", s.description, f, i))
s.logger.Info("Compacting file", zap.Int("index", i), zap.String("file", f))
}
var (
@ -1927,7 +1932,7 @@ func (s *compactionStrategy) compactGroup() {
if err != nil {
_, inProgress := err.(errCompactionInProgress)
if err == errCompactionsDisabled || inProgress {
s.logger.Info(fmt.Sprintf("aborted %s compaction. %v", s.description, err))
s.logger.Info("Aborted compaction", zap.Error(err))
if _, ok := err.(errCompactionInProgress); ok {
time.Sleep(time.Second)
@ -1935,23 +1940,26 @@ func (s *compactionStrategy) compactGroup() {
return
}
s.logger.Info(fmt.Sprintf("error compacting TSM files: %v", err))
s.logger.Info("Error compacting TSM files", zap.Error(err))
atomic.AddInt64(s.errorStat, 1)
time.Sleep(time.Second)
return
}
if err := s.fileStore.ReplaceWithCallback(group, files, nil); err != nil {
s.logger.Info(fmt.Sprintf("error replacing new TSM files: %v", err))
s.logger.Info("Error replacing new TSM files", zap.Error(err))
atomic.AddInt64(s.errorStat, 1)
time.Sleep(time.Second)
return
}
for i, f := range files {
s.logger.Info(fmt.Sprintf("compacted %s into %s (#%d)", s.description, f, i))
s.logger.Info("Compacted file", zap.Int("index", i), zap.String("file", f))
}
s.logger.Info(fmt.Sprintf("compacted %s %d files into %d files in %s", s.description, len(group), len(files), time.Since(start)))
s.logger.Info("Finished compacting files",
zap.Int("groups", len(group)),
zap.Int("files", len(files)),
zap.Duration("duration", time.Since(start)))
atomic.AddInt64(s.successStat, 1)
}
@ -1960,14 +1968,13 @@ func (s *compactionStrategy) compactGroup() {
func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level int) *compactionStrategy {
return &compactionStrategy{
group: group,
logger: e.logger,
logger: e.logger.With(zap.Int("level", level), zap.String("strategy", "level")),
fileStore: e.FileStore,
compactor: e.Compactor,
fast: fast,
engine: e,
level: level,
description: fmt.Sprintf("level %d", level),
activeStat: &e.stats.TSMCompactionsActive[level-1],
successStat: &e.stats.TSMCompactions[level-1],
errorStat: &e.stats.TSMCompactionErrors[level-1],
@ -1980,7 +1987,7 @@ func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level
func (e *Engine) fullCompactionStrategy(group CompactionGroup, optimize bool) *compactionStrategy {
s := &compactionStrategy{
group: group,
logger: e.logger,
logger: e.logger.With(zap.String("strategy", "full"), zap.Bool("optimize", optimize)),
fileStore: e.FileStore,
compactor: e.Compactor,
fast: optimize,
@ -1989,13 +1996,11 @@ func (e *Engine) fullCompactionStrategy(group CompactionGroup, optimize bool) *c
}
if optimize {
s.description = "optimize"
s.activeStat = &e.stats.TSMOptimizeCompactionsActive
s.successStat = &e.stats.TSMOptimizeCompactions
s.errorStat = &e.stats.TSMOptimizeCompactionErrors
s.durationStat = &e.stats.TSMOptimizeCompactionDuration
} else {
s.description = "full"
s.activeStat = &e.stats.TSMFullCompactionsActive
s.successStat = &e.stats.TSMFullCompactions
s.errorStat = &e.stats.TSMFullCompactionErrors
@ -2027,7 +2032,8 @@ func (e *Engine) reloadCache() error {
return err
}
e.traceLogger.Info(fmt.Sprintf("Reloaded WAL cache %s in %v", e.WAL.Path(), time.Since(now)))
e.traceLogger.Info("Reloaded WAL cache",
zap.String("path", e.WAL.Path()), zap.Duration("duration", time.Since(now)))
return nil
}

View File

@ -483,7 +483,10 @@ func (f *FileStore) Open() error {
go func(idx int, file *os.File) {
start := time.Now()
df, err := NewTSMReader(file)
f.logger.Info(fmt.Sprintf("%s (#%d) opened in %v", file.Name(), idx, time.Since(start)))
f.logger.Info("Opened file",
zap.String("path", file.Name()),
zap.Int("id", idx),
zap.Duration("duration", time.Since(start)))
if err != nil {
readerC <- &res{r: df, err: fmt.Errorf("error opening memory map for file %s: %v", file.Name(), err)}
@ -926,7 +929,7 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
// CreateSnapshot creates hardlinks for all tsm and tombstone files
// in the path provided.
func (f *FileStore) CreateSnapshot() (string, error) {
f.traceLogger.Info(fmt.Sprintf("Creating snapshot in %s", f.dir))
f.traceLogger.Info("Creating snapshot", zap.String("dir", f.dir))
files := f.Files()
f.mu.Lock()
@ -1275,12 +1278,12 @@ func (p *purger) purge() {
for k, v := range p.files {
if !v.InUse() {
if err := v.Close(); err != nil {
p.logger.Info(fmt.Sprintf("purge: close file: %v", err))
p.logger.Info("Purge: close file", zap.Error(err))
continue
}
if err := v.Remove(); err != nil {
p.logger.Info(fmt.Sprintf("purge: remove file: %v", err))
p.logger.Info("Purge: remove file", zap.Error(err))
continue
}
delete(p.files, k)

View File

@ -184,8 +184,8 @@ func (l *WAL) Open() error {
l.mu.Lock()
defer l.mu.Unlock()
l.traceLogger.Info(fmt.Sprintf("tsm1 WAL starting with %d segment size", l.SegmentSize))
l.traceLogger.Info(fmt.Sprintf("tsm1 WAL writing to %s", l.path))
l.traceLogger.Info("tsm1 WAL starting", zap.Int("segment_size", l.SegmentSize))
l.traceLogger.Info("tsm1 WAL writing", zap.String("path", l.path))
if err := os.MkdirAll(l.path, 0777); err != nil {
return err
@ -350,7 +350,7 @@ func (l *WAL) Remove(files []string) error {
l.mu.Lock()
defer l.mu.Unlock()
for _, fn := range files {
l.traceLogger.Info(fmt.Sprintf("Removing %s", fn))
l.traceLogger.Info("Removing WAL file", zap.String("path", fn))
os.RemoveAll(fn)
}
@ -523,7 +523,7 @@ func (l *WAL) Close() error {
l.once.Do(func() {
// Close, but don't set to nil so future goroutines can still be signaled
l.traceLogger.Info(fmt.Sprintf("Closing %s", l.path))
l.traceLogger.Info("Closing WAL file", zap.String("path", l.path))
close(l.closing)
if l.currentSegmentWriter != nil {

View File

@ -100,13 +100,13 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic {
for _, database := range databases {
sc, err := s.SeriesCardinality(database)
if err != nil {
s.Logger.Error("cannot retrieve series cardinality", zap.Error(err))
s.Logger.Info("Cannot retrieve series cardinality", zap.Error(err))
continue
}
mc, err := s.MeasurementsCardinality(database)
if err != nil {
s.Logger.Error("cannot retrieve measurement cardinality", zap.Error(err))
s.Logger.Info("Cannot retrieve measurement cardinality", zap.Error(err))
continue
}
@ -144,7 +144,7 @@ func (s *Store) Open() error {
s.closing = make(chan struct{})
s.shards = map[uint64]*Shard{}
s.Logger.Info(fmt.Sprintf("Using data dir: %v", s.Path()))
s.Logger.Info("Using data dir", zap.String("path", s.Path()))
// Create directory.
if err := os.MkdirAll(s.path, 0777); err != nil {
@ -210,7 +210,7 @@ func (s *Store) loadShards() error {
for _, db := range dbDirs {
if !db.IsDir() {
s.Logger.Info("Not loading. Not a database directory.", zap.String("name", db.Name()))
s.Logger.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))
continue
}
@ -234,7 +234,7 @@ func (s *Store) loadShards() error {
for _, rp := range rpDirs {
if !rp.IsDir() {
s.Logger.Info(fmt.Sprintf("Skipping retention policy dir: %s. Not a directory", rp.Name()))
s.Logger.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory"))
continue
}
@ -291,7 +291,7 @@ func (s *Store) loadShards() error {
}
resC <- &res{s: shard}
s.Logger.Info(fmt.Sprintf("%s opened in %s", path, time.Since(start)))
s.Logger.Info("Opened shard", zap.String("path", path), zap.Duration("duration", time.Since(start)))
}(db.Name(), rp.Name(), sh.Name())
}
}
@ -1652,7 +1652,7 @@ func (s *Store) monitorShards() {
for _, sh := range s.shards {
if sh.IsIdle() {
if err := sh.Free(); err != nil {
s.Logger.Warn("error free cold shard resources:", zap.Error(err))
s.Logger.Warn("Error while freeing cold shard resources", zap.Error(err))
}
} else {
sh.SetCompactionsEnabled(true)
@ -1710,7 +1710,7 @@ func (s *Store) monitorShards() {
indexSet := IndexSet{Indexes: []Index{firstShardIndex}, SeriesFile: sfile}
names, err := indexSet.MeasurementNamesByExpr(nil, nil)
if err != nil {
s.Logger.Warn("cannot retrieve measurement names", zap.Error(err))
s.Logger.Warn("Cannot retrieve measurement names", zap.Error(err))
return nil
}
@ -1725,8 +1725,13 @@ func (s *Store) monitorShards() {
// Log at 80, 85, 90-100% levels
if perc == 80 || perc == 85 || perc >= 90 {
s.Logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s measurement=%s tag=%s",
perc, n, s.EngineOptions.Config.MaxValuesPerTag, db, name, k))
s.Logger.Warn("max-values-per-tag limit may be exceeded soon",
zap.String("perc", fmt.Sprintf("%d%%", perc)),
zap.Int("n", n),
zap.Int("max", s.EngineOptions.Config.MaxValuesPerTag),
zap.String("db", db),
zap.ByteString("measurement", name),
zap.ByteString("tag", k))
}
return nil
})