From 2bbd96768d9204c7cc5516ef01ac5b2efe22282f Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 15 Feb 2018 16:47:08 -0500 Subject: [PATCH] Update logging calls to take advantage of structured logging Includes a style guide that details the basics of how to log. --- cmd/influxd/main.go | 6 +- cmd/influxd/run/command.go | 16 ++- coordinator/points_writer.go | 5 +- logger/logger.go | 49 ++++++- logger/style_guide.md | 192 +++++++++++++++++++++++++ monitor/service.go | 23 ++- monitor/service_test.go | 2 +- services/collectd/service.go | 35 +++-- services/continuous_querier/service.go | 23 +-- services/graphite/service.go | 21 ++- services/httpd/handler.go | 20 ++- services/httpd/service.go | 11 +- services/meta/client.go | 14 +- services/opentsdb/handler.go | 7 +- services/opentsdb/service.go | 29 ++-- services/precreator/service.go | 11 +- services/retention/service.go | 45 ++++-- services/snapshotter/service.go | 4 +- services/subscriber/service.go | 18 ++- services/udp/service.go | 23 +-- tsdb/engine/tsm1/cache.go | 4 +- tsdb/engine/tsm1/engine.go | 56 ++++---- tsdb/engine/tsm1/file_store.go | 11 +- tsdb/engine/tsm1/wal.go | 8 +- tsdb/store.go | 25 ++-- 25 files changed, 493 insertions(+), 165 deletions(-) create mode 100644 logger/style_guide.md diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index 3344c69cd6..21773ba22d 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -96,11 +96,11 @@ func (m *Main) Run(args ...string) error { cmd.Logger.Info("Waiting for clean shutdown...") select { case <-signalCh: - cmd.Logger.Info("second signal received, initializing hard shutdown") + cmd.Logger.Info("Second signal received, initializing hard shutdown") case <-time.After(time.Second * 30): - cmd.Logger.Info("time limit reached, initializing hard shutdown") + cmd.Logger.Info("Time limit reached, initializing hard shutdown") case <-cmd.Closed: - cmd.Logger.Info("server shutdown completed") + cmd.Logger.Info("Server shutdown completed") } // goodbye. diff --git a/cmd/influxd/run/command.go b/cmd/influxd/run/command.go index 5be5e88938..575356546f 100644 --- a/cmd/influxd/run/command.go +++ b/cmd/influxd/run/command.go @@ -99,9 +99,13 @@ func (cmd *Command) Run(args ...string) error { } // Mark start-up in log. - cmd.Logger.Info(fmt.Sprintf("InfluxDB starting, version %s, branch %s, commit %s", - cmd.Version, cmd.Branch, cmd.Commit)) - cmd.Logger.Info(fmt.Sprintf("Go version %s, GOMAXPROCS set to %d", runtime.Version(), runtime.GOMAXPROCS(0))) + cmd.Logger.Info("InfluxDB starting", + zap.String("version", cmd.Version), + zap.String("branch", cmd.Branch), + zap.String("commit", cmd.Commit)) + cmd.Logger.Info("Go runtime", + zap.String("version", runtime.Version()), + zap.Int("maxprocs", runtime.GOMAXPROCS(0))) // If there was an error on startup when creating the logger, output it now. if logErr != nil { @@ -187,7 +191,7 @@ func (cmd *Command) monitorServerErrors() { func (cmd *Command) removePIDFile() { if cmd.pidfile != "" { if err := os.Remove(cmd.pidfile); err != nil { - cmd.Logger.Error("unable to remove pidfile", zap.Error(err)) + cmd.Logger.Error("Unable to remove pidfile", zap.Error(err)) } } } @@ -235,11 +239,11 @@ func (cmd *Command) writePIDFile(path string) error { func (cmd *Command) ParseConfig(path string) (*Config, error) { // Use demo configuration if no config path is specified. if path == "" { - cmd.Logger.Info("no configuration provided, using default settings") + cmd.Logger.Info("No configuration provided, using default settings") return NewDemoConfig() } - cmd.Logger.Info(fmt.Sprintf("Using configuration at: %s", path)) + cmd.Logger.Info("Loading configuration file", zap.String("path", path)) config := NewConfig() if err := config.FromTomlFile(path); err != nil { diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index af63abfe38..47bc1dc8ef 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -2,7 +2,6 @@ package coordinator import ( "errors" - "fmt" "sort" "sync" "sync/atomic" @@ -380,7 +379,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo if err == tsdb.ErrShardNotFound { err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true) if err != nil { - w.Logger.Info(fmt.Sprintf("write failed for shard %d: %v", shard.ID, err)) + w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err)) atomic.AddInt64(&w.stats.WriteErr, 1) return err @@ -388,7 +387,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo } err = w.TSDBStore.WriteToShard(shard.ID, points) if err != nil { - w.Logger.Info(fmt.Sprintf("write failed for shard %d: %v", shard.ID, err)) + w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err)) atomic.AddInt64(&w.stats.WriteErr, 1) return err } diff --git a/logger/logger.go b/logger/logger.go index 6fe38516e9..8e61c4cb86 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -11,6 +11,8 @@ import ( "go.uber.org/zap/zapcore" ) +const TimeFormat = "2006-01-02T15:04:05.000000Z07:00" + func New(w io.Writer) *zap.Logger { config := NewConfig() l, _ := config.New(w) @@ -63,11 +65,13 @@ func newEncoder(format string) (zapcore.Encoder, error) { func newEncoderConfig() zapcore.EncoderConfig { config := zap.NewProductionEncoderConfig() config.EncodeTime = func(ts time.Time, encoder zapcore.PrimitiveArrayEncoder) { - encoder.AppendString(ts.UTC().Format(time.RFC3339)) + encoder.AppendString(ts.UTC().Format(TimeFormat)) } config.EncodeDuration = func(d time.Duration, encoder zapcore.PrimitiveArrayEncoder) { - encoder.AppendString(d.String()) + val := float64(d) / float64(time.Millisecond) + encoder.AppendString(fmt.Sprintf("%.3fms", val)) } + config.LevelKey = "lvl" return config } @@ -80,3 +84,44 @@ func IsTerminal(w io.Writer) bool { } return false } + +const ( + year = 365 * 24 * time.Hour + week = 7 * 24 * time.Hour + day = 24 * time.Hour +) + +func DurationLiteral(key string, val time.Duration) zapcore.Field { + if val == 0 { + return zap.String(key, "0s") + } + + var ( + value int + unit string + ) + switch { + case val%year == 0: + value = int(val / year) + unit = "y" + case val%week == 0: + value = int(val / week) + unit = "w" + case val%day == 0: + value = int(val / day) + unit = "d" + case val%time.Hour == 0: + value = int(val / time.Hour) + unit = "h" + case val%time.Minute == 0: + value = int(val / time.Minute) + unit = "m" + case val%time.Second == 0: + value = int(val / time.Second) + unit = "s" + default: + value = int(val / time.Millisecond) + unit = "ms" + } + return zap.String(key, fmt.Sprintf("%d%s", value, unit)) +} diff --git a/logger/style_guide.md b/logger/style_guide.md new file mode 100644 index 0000000000..7003fd3f3b --- /dev/null +++ b/logger/style_guide.md @@ -0,0 +1,192 @@ +# Logging Style Guide + +The intention of logging is to give insight to the administrator of how +the server is running and also notify the administrator of any problems +or potential problems with the system. + +At the moment, log level filtering is the only option to configure +logging in InfluxDB. Adding a logging message and choosing its level +should be done according to the guidelines in this document for +operational clarity. The available log levels are: + +* Error +* Warn +* Info +* Debug + +InfluxDB uses structured logging. Structured logging is when you log +messages and attach context to those messages with more easily read data +regarding the state of the system. A structured log message is composed +of: + +* Time +* Level +* Message +* (Optionally) Additional context + +## Guidelines + +**Log messages** should be simple statements or phrases that begin with +a capital letter, but have no punctuation at the end. The message should be a +constant so that every time it is logged it is easily identified and can +be filtered by without regular expressions. + +Any **dynamic content** should be expressed by context. The key should +be a constant and the value is the dynamic content. + +Do not log messages in tight loops or other high performance locations. +It will likely create a performance problem. + +## Naming Conventions + +If the log encoding format uses keys for the time, message, or level, +the key names should be `ts` for time, `msg` for the message, and +`lvl` for the level. + +If the log encoding format does not use keys for the time, message, or +level and instead outputs them in some other method, this guideline can +be ignored. The output formats logfmt and json both use keys when +encoding these values. + +### Context Key Names + +The key for the dynamic content in the context should be formatted in +`snake_case`. The key should be completely lower case. + +## Levels + +As a reminder, levels are usually the only way to configure what is +logged. There are four available logging levels. + +* Error +* Warn +* Info +* Debug + +It is important to get the right logging level to ensure the log +messages are useful for end users to act on. + +In general, when considering which log level to use, you should use +**info**. If you are considering using another level, read the below +expanded descriptions to determine which level your message belongs in. + +### Error + +The **error** level is intended to communicate that there is a serious +problem with the server. **An error should be emitted only when an +on-call engineer can take some action to remedy the situation _and_ the +system cannot continue operating properly without remedying the +situation.** + +An example of what may qualify as an error level message is the creation +of the internal storage for the monitor service. For that system to +function at all, a database must be created. If no database is created, +the service itself cannot function. The error has a clear actionable +solution. Figure out why the database isn't being created and create it. + +An example of what does not qualify as an error is failing to parse a +query or a socket closing prematurely. Both of these usually indicate +some kind of user error rather than system error. Both are ephemeral +errors and they would not be clearly actionable to an administrator who +was paged at 3 AM. Both of these are examples of logging messages that +should be emitted at the info level with an error key rather than being +logged at the error level. + +Logged errors **must not propagate**. Propagating the error risks +logging it in multiple locations and confusing users when the same error +is reported multiple times. In general, if you are returning an error, +never log at any level. By returning the error, you are telling the +parent function to handle the error. Logging a message at any level is +handling the error. + +This logging message should be used very rarely and any messages that +use this logging level should not repeat frequently. Assume that +anything that is logged with error will page someone in the middle of +the night. + +### Warn + +The **warn** level is intended to communicate that there is likely to be +a serious problem with the server if it not addressed. **A warning +should be emitted only when a support engineer can take some action to +remedy the situation _and_ the system may not continue operating +properly in the near future without remedying the situation.** + +An example of what may qualify as a warning is the `max-values-per-tag` +setting. If the server starts to approach the maximum number of values, +the server may stop being able to function properly when it reaches the +maximum number. + +An example of what does not qualify as a warning is the +`log-queries-after` setting. While the message is "warning" that a query +was running for a long period of time, it is not clearly actionable and +does not indicate that the server will fail in the near future. This +should be logged at the info level instead. + +This logging message should be used very rarely and any messages that +use this logging level should not repeat frequently. Assume that +anything that is logged with warn will page someone in the middle of the +night and potentially ignored until normal working hours. + +### Info + +The **info** level should be used for almost anything. If you are not +sure which logging level to use, use info. Temporary or user errors +should be logged at the info level and any informational messages for +administrators should be logged at this level. Info level messages +should be safe for an administrator to discard if they really want to, +but most people will run the system at the info level. + +### Debug + +The **debug** level exists to log messages that are useful only for +debugging a bad running instance. + +This level should be rarely used if ever. If you intend to use this +level, please have a rationale ready. Most messages that could be +considered debug either shouldn't exist or should be logged at the info +level. Debug messages will be suppressed by default. + +## Value Formatting + +Formatting for strings, integers, and other standard values are usually +determined by the log format itself and those will be kept ambiguous. +The following specific formatting choices are for data types that could +be output in multiple ways. + +### Time + +Time values should be encoded using RFC3339 with microsecond precision. +The size of the string should be normalized to the same number of digits +every time to ensure that it is easier to read the time as a column. + +### Duration + +Duration values that denote a period of time should be output in +milliseconds with microsecond precision. The microseconds should be in +decimal form with three decimal points. Durations that denote a static +period of time should be output with a single number and a suffix with +the largest possible unit that doesn't cause the value to be a decimal. + +There are two types of durations. + +* Tracks a (usually small) period of time and is meant for timing how + long something take. The content is dynamic and may be graphed. +* Duration literal where the content is dynamic, is unlikely to be + graphed, and usually comes from some type of configuration. + +If the content is dynamic, the duration should be printed as a number of +milliseconds with a decimal indicating the number of microseconds. Any +duration lower than microseconds should be truncated. The decimal section +should always print exactly 3 points after the decimal point. + +If the content is static, the duration should be printed with a single +number and a suffix indicating the unit in years (`y`), weeks (`w`), +days (`d`), hours (`h`), minutes (`m`), seconds (`s`), or +milliseconds (`ms`). The suffix should be the greatest unit that can be +used without truncating the value. As an example, if the duration is +60 minutes, then `1h` should be used. If the duration is 61 minutes, +then `61m` should be used. + +For anything lower than milliseconds that is static, the duration should +be truncated. A value of zero should be shown as `0s`. diff --git a/monitor/service.go b/monitor/service.go index 11e5bfcac3..60aa95068b 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/monitor/diagnostics" "github.com/influxdata/influxdb/services/meta" @@ -98,7 +99,7 @@ func (m *Monitor) Open() error { return nil } - m.Logger.Info("Starting monitor system") + m.Logger.Info("Starting monitor service") // Self-register various stats and diagnostics. m.RegisterDiagnosticsClient("build", &build{ @@ -149,7 +150,7 @@ func (m *Monitor) writePoints(p models.Points) error { defer m.mu.RUnlock() if err := m.PointsWriter.WritePoints(m.storeDatabase, m.storeRetentionPolicy, p); err != nil { - m.Logger.Info(fmt.Sprintf("failed to store statistics: %s", err)) + m.Logger.Info("failed to store statistics", zap.Error(err)) } return nil } @@ -157,11 +158,11 @@ func (m *Monitor) writePoints(p models.Points) error { // Close closes the monitor system. func (m *Monitor) Close() error { if !m.open() { - m.Logger.Info("Monitor is already closed.") + m.Logger.Info("Monitor is already closed") return nil } - m.Logger.Info("shutting down monitor system") + m.Logger.Info("Shutting down monitor service") m.mu.Lock() close(m.done) m.mu.Unlock() @@ -220,7 +221,7 @@ func (m *Monitor) RegisterDiagnosticsClient(name string, client diagnostics.Clie m.mu.Lock() defer m.mu.Unlock() m.diagRegistrations[name] = client - m.Logger.Info("registered for diagnostics monitoring", zap.String("name", name)) + m.Logger.Info("Registered diagnostics client", zap.String("name", name)) } // DeregisterDiagnosticsClient deregisters a diagnostics client by name. @@ -389,8 +390,7 @@ func (m *Monitor) createInternalStorage() { } if _, err := m.MetaClient.CreateDatabaseWithRetentionPolicy(m.storeDatabase, &spec); err != nil { - m.Logger.Info(fmt.Sprintf("failed to create database '%s', failed to create storage: %s", - m.storeDatabase, err.Error())) + m.Logger.Info("Failed to create storage", zap.String("db", m.storeDatabase), zap.Error(err)) return } } @@ -417,8 +417,7 @@ func (m *Monitor) waitUntilInterval(d time.Duration) error { // storeStatistics writes the statistics to an InfluxDB system. func (m *Monitor) storeStatistics() { defer m.wg.Done() - m.Logger.Info(fmt.Sprintf("Storing statistics in database '%s' retention policy '%s', at interval %s", - m.storeDatabase, m.storeRetentionPolicy, m.storeInterval)) + m.Logger.Info("Storing statistics", zap.String("db", m.storeDatabase), zap.String("rp", m.storeRetentionPolicy), logger.DurationLiteral("interval", m.storeInterval)) // Wait until an even interval to start recording monitor statistics. // If we are interrupted before the interval for some reason, exit early. @@ -441,7 +440,7 @@ func (m *Monitor) storeStatistics() { stats, err := m.Statistics(m.globalTags) if err != nil { - m.Logger.Info(fmt.Sprintf("failed to retrieve registered statistics: %s", err)) + m.Logger.Info("Failed to retrieve registered statistics", zap.Error(err)) return } @@ -450,7 +449,7 @@ func (m *Monitor) storeStatistics() { for _, s := range stats { pt, err := models.NewPoint(s.Name, models.NewTags(s.Tags), s.Values, now) if err != nil { - m.Logger.Info(fmt.Sprintf("Dropping point %v: %v", s.Name, err)) + m.Logger.Info("Dropping point", zap.String("name", s.Name), zap.Error(err)) return } batch = append(batch, pt) @@ -466,7 +465,7 @@ func (m *Monitor) storeStatistics() { m.writePoints(batch) } case <-m.done: - m.Logger.Info(fmt.Sprintf("terminating storage of statistics")) + m.Logger.Info("Terminating storage of statistics") return } } diff --git a/monitor/service_test.go b/monitor/service_test.go index 72cfb34f8b..f734b80aec 100644 --- a/monitor/service_test.go +++ b/monitor/service_test.go @@ -61,7 +61,7 @@ func TestMonitor_SetPointsWriter_StoreEnabled(t *testing.T) { defer s.Close() // Verify that the monitor was opened by looking at the log messages. - if logs.FilterMessage("Starting monitor system").Len() == 0 { + if logs.FilterMessage("Starting monitor service").Len() == 0 { t.Errorf("monitor system was never started") } } diff --git a/services/collectd/service.go b/services/collectd/service.go index eaea858406..e1c8fd8952 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -123,7 +123,8 @@ func (s *Service) Open() error { readdir = func(path string) { files, err := ioutil.ReadDir(path) if err != nil { - s.Logger.Info(fmt.Sprintf("Unable to read directory %s: %s", path, err)) + s.Logger.Info("Unable to read directory", + zap.String("path", path), zap.Error(err)) return } @@ -134,10 +135,10 @@ func (s *Service) Open() error { continue } - s.Logger.Info(fmt.Sprintf("Loading %s", fullpath)) + s.Logger.Info("Loading types from file", zap.String("path", fullpath)) types, err := TypesDBFile(fullpath) if err != nil { - s.Logger.Info(fmt.Sprintf("Unable to parse collectd types file: %s", f.Name())) + s.Logger.Info("Unable to parse collectd types file", zap.String("path", f.Name())) continue } @@ -147,7 +148,7 @@ func (s *Service) Open() error { readdir(s.Config.TypesDB) s.popts.TypesDB = alltypesdb } else { - s.Logger.Info(fmt.Sprintf("Loading %s", s.Config.TypesDB)) + s.Logger.Info("Loading types from file", zap.String("path", s.Config.TypesDB)) types, err := TypesDBFile(s.Config.TypesDB) if err != nil { return fmt.Errorf("Open(): %s", err) @@ -194,7 +195,7 @@ func (s *Service) Open() error { } s.conn = conn - s.Logger.Info(fmt.Sprint("Listening on UDP: ", conn.LocalAddr().String())) + s.Logger.Info("Listening on UDP", zap.Stringer("addr", conn.LocalAddr())) // Start the points batcher. s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchPending, time.Duration(s.Config.BatchDuration)) @@ -241,7 +242,7 @@ func (s *Service) Close() error { s.conn = nil s.batcher = nil - s.Logger.Info("collectd UDP closed") + s.Logger.Info("Closed collectd service") s.done = nil return nil } @@ -345,8 +346,16 @@ func (s *Service) serve() { n, _, err := s.conn.ReadFromUDP(buffer) if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { + select { + case <-s.done: + return + default: + // The socket wasn't closed by us so consider it an error. + } + } atomic.AddInt64(&s.stats.ReadFail, 1) - s.Logger.Info(fmt.Sprintf("collectd ReadFromUDP error: %s", err)) + s.Logger.Info("ReadFromUDP error", zap.Error(err)) continue } if n > 0 { @@ -360,7 +369,7 @@ func (s *Service) handleMessage(buffer []byte) { valueLists, err := network.Parse(buffer, s.popts) if err != nil { atomic.AddInt64(&s.stats.PointsParseFail, 1) - s.Logger.Info(fmt.Sprintf("Collectd parse error: %s", err)) + s.Logger.Info("collectd parse error", zap.Error(err)) return } var points []models.Point @@ -385,7 +394,8 @@ func (s *Service) writePoints() { case batch := <-s.batcher.Out(): // Will attempt to create database if not yet created. if err := s.createInternalStorage(); err != nil { - s.Logger.Info(fmt.Sprintf("Required database %s not yet created: %s", s.Config.Database, err.Error())) + s.Logger.Info("Required database not yet created", + zap.String("db", s.Config.Database), zap.Error(err)) continue } @@ -393,7 +403,8 @@ func (s *Service) writePoints() { atomic.AddInt64(&s.stats.BatchesTransmitted, 1) atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) } else { - s.Logger.Info(fmt.Sprintf("failed to write point batch to database %q: %s", s.Config.Database, err)) + s.Logger.Info("Failed to write point batch to database", + zap.String("db", s.Config.Database), zap.Error(err)) atomic.AddInt64(&s.stats.BatchesTransmitFail, 1) } } @@ -439,7 +450,7 @@ func (s *Service) UnmarshalValueListPacked(vl *api.ValueList) []models.Point { // Drop invalid points p, err := models.NewPoint(name, models.NewTags(tags), fields, timestamp) if err != nil { - s.Logger.Info(fmt.Sprintf("Dropping point %v: %v", name, err)) + s.Logger.Info("Dropping point", zap.String("name", name), zap.Error(err)) atomic.AddInt64(&s.stats.InvalidDroppedPoints, 1) return nil } @@ -483,7 +494,7 @@ func (s *Service) UnmarshalValueList(vl *api.ValueList) []models.Point { // Drop invalid points p, err := models.NewPoint(name, models.NewTags(tags), fields, timestamp) if err != nil { - s.Logger.Info(fmt.Sprintf("Dropping point %v: %v", name, err)) + s.Logger.Info("Dropping point", zap.String("name", name), zap.Error(err)) atomic.AddInt64(&s.stats.InvalidDroppedPoints, 1) continue } diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index 989029cee4..8feaf47715 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -9,6 +9,7 @@ import ( "sync/atomic" "time" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/meta" @@ -213,14 +214,14 @@ func (s *Service) backgroundLoop() { for { select { case <-s.stop: - s.Logger.Info("continuous query service terminating") + s.Logger.Info("Terminating continuous query service") return case req := <-s.RunCh: if !s.hasContinuousQueries() { continue } if _, err := s.MetaClient.AcquireLease(leaseName); err == nil { - s.Logger.Info(fmt.Sprintf("running continuous queries by request for time: %v", req.Now)) + s.Logger.Info("Running continuous queries by request", zap.Time("at", req.Now)) s.runContinuousQueries(req) } case <-t.C: @@ -261,7 +262,7 @@ func (s *Service) runContinuousQueries(req *RunRequest) { continue } if ok, err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil { - s.Logger.Info(fmt.Sprintf("error executing query: %s: err = %s", cq.Query, err)) + s.Logger.Info("Error executing query", zap.String("query", cq.Query), zap.Error(err)) atomic.AddInt64(&s.stats.QueryFail, 1) } else if ok { atomic.AddInt64(&s.stats.QueryOK, 1) @@ -356,8 +357,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti } if err := cq.q.SetTimeRange(startTime, endTime); err != nil { - s.Logger.Info(fmt.Sprintf("error setting time range: %s", err)) - return false, err + return false, fmt.Errorf("unable to set time range: %s", err) } var start time.Time @@ -366,13 +366,15 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti } if s.loggingEnabled { - s.Logger.Info(fmt.Sprintf("executing continuous query %s (%v to %v)", cq.Info.Name, startTime, endTime)) + s.Logger.Info("Executing continuous query", + zap.String("name", cq.Info.Name), + zap.Time("start", startTime), + zap.Time("end", endTime)) } // Do the actual processing of the query & writing of results. res := s.runContinuousQueryAndWriteResult(cq) if res.Err != nil { - s.Logger.Info(fmt.Sprintf("error: %s. running: %s", res.Err, cq.q.String())) return false, res.Err } @@ -389,7 +391,12 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti } if s.loggingEnabled { - s.Logger.Info(fmt.Sprintf("finished continuous query %s, %d points(s) written (%v to %v) in %s", cq.Info.Name, written, startTime, endTime, execDuration)) + s.Logger.Info("Finished continuous query", + zap.String("name", cq.Info.Name), + zap.Int64("written", written), + zap.Time("start", startTime), + zap.Time("end", endTime), + logger.DurationLiteral("duration", execDuration)) } if s.queryStatsEnabled && s.Monitor.Enabled() { diff --git a/services/graphite/service.go b/services/graphite/service.go index 3a6437b657..b86aaccd67 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/monitor/diagnostics" "github.com/influxdata/influxdb/services/meta" @@ -133,7 +134,9 @@ func (s *Service) Open() error { } s.done = make(chan struct{}) - s.logger.Info(fmt.Sprintf("Starting graphite service, batch size %d, batch timeout %s", s.batchSize, s.batchTimeout)) + s.logger.Info("Starting graphite service", + zap.Int("batch_size", s.batchSize), + logger.DurationLiteral("batch_timeout", s.batchTimeout)) // Register diagnostics if a Monitor service is available. if s.Monitor != nil { @@ -159,9 +162,12 @@ func (s *Service) Open() error { return err } - s.logger.Info(fmt.Sprintf("Listening on %s: %s", strings.ToUpper(s.protocol), s.addr.String())) + s.logger.Info("Listening", + zap.String("protocol", s.protocol), + zap.Stringer("addr", s.addr)) return nil } + func (s *Service) closeAllConnections() { s.tcpConnectionsMu.Lock() defer s.tcpConnectionsMu.Unlock() @@ -317,11 +323,11 @@ func (s *Service) openTCPServer() (net.Addr, error) { for { conn, err := s.ln.Accept() if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() { - s.logger.Info("graphite TCP listener closed") + s.logger.Info("Graphite TCP listener closed") return } if err != nil { - s.logger.Info("error accepting TCP connection", zap.Error(err)) + s.logger.Info("Error accepting TCP connection", zap.Error(err)) continue } @@ -432,7 +438,7 @@ func (s *Service) handleLine(line string) { return } } - s.logger.Info(fmt.Sprintf("unable to parse line: %s: %s", line, err)) + s.logger.Info("Unable to parse line", zap.String("line", line), zap.Error(err)) atomic.AddInt64(&s.stats.PointsParseFail, 1) return } @@ -448,7 +454,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) { case batch := <-batcher.Out(): // Will attempt to create database if not yet created. if err := s.createInternalStorage(); err != nil { - s.logger.Info(fmt.Sprintf("Required database or retention policy do not yet exist: %s", err.Error())) + s.logger.Info("Required database or retention policy do not yet exist", zap.Error(err)) continue } @@ -456,7 +462,8 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) { atomic.AddInt64(&s.stats.BatchesTransmitted, 1) atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) } else { - s.logger.Info(fmt.Sprintf("failed to write point batch to database %q: %s", s.database, err)) + s.logger.Info("Failed to write point batch to database", + zap.String("db", s.database), zap.Error(err)) atomic.AddInt64(&s.stats.BatchesTransmitFail, 1) } diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 3c097d6d83..9f28341e43 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -392,7 +392,10 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U if h.Config.AuthEnabled { if err := h.QueryAuthorizer.AuthorizeQuery(user, q, db); err != nil { if err, ok := err.(meta.ErrAuthorize); ok { - h.Logger.Info(fmt.Sprintf("Unauthorized request | user: %q | query: %q | database %q", err.User, err.Query.String(), err.Database)) + h.Logger.Info("Unauthorized request", + zap.String("user", err.User), + zap.Stringer("query", err.Query), + zap.String("db", err.Database)) } h.httpError(rw, "error authorizing query: "+err.Error(), http.StatusForbidden) return @@ -598,7 +601,9 @@ func (h *Handler) async(q *influxql.Query, results <-chan *query.Result) { if r.Err == query.ErrNotExecuted { continue } - h.Logger.Info(fmt.Sprintf("error while running async query: %s: %s", q, r.Err)) + h.Logger.Info("Error while running async query", + zap.Stringer("query", q), + zap.Error(r.Err)) } } } @@ -681,7 +686,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.U atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len())) if h.Config.WriteTracing { - h.Logger.Info(fmt.Sprintf("Write body received by handler: %s", buf.Bytes())) + h.Logger.Info("Write body received by handler", zap.ByteString("body", buf.Bytes())) } points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision")) @@ -851,7 +856,7 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len())) if h.Config.WriteTracing { - h.Logger.Info(fmt.Sprintf("Prom write body received by handler: %s", buf.Bytes())) + h.Logger.Info("Prom write body received by handler", zap.ByteString("body", buf.Bytes())) } reqBuf, err := snappy.Decode(nil, buf.Bytes()) @@ -870,7 +875,7 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me points, err := prometheus.WriteRequestToPoints(&req) if err != nil { if h.Config.WriteTracing { - h.Logger.Info(fmt.Sprintf("Prom write handler: %s", err.Error())) + h.Logger.Info("Prom write handler", zap.Error(err)) } if err != prometheus.ErrNaNDropped { @@ -947,7 +952,10 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met if h.Config.AuthEnabled { if err := h.QueryAuthorizer.AuthorizeQuery(user, q, db); err != nil { if err, ok := err.(meta.ErrAuthorize); ok { - h.Logger.Info(fmt.Sprintf("Unauthorized request | user: %q | query: %q | database %q", err.User, err.Query.String(), err.Database)) + h.Logger.Info("Unauthorized request", + zap.String("user", err.User), + zap.Stringer("query", err.Query), + zap.String("db", err.Database)) } h.httpError(w, "error authorizing query: "+err.Error(), http.StatusForbidden) return diff --git a/services/httpd/service.go b/services/httpd/service.go index 5d0711b5d2..f6d530348c 100644 --- a/services/httpd/service.go +++ b/services/httpd/service.go @@ -86,8 +86,7 @@ func NewService(c Config) *Service { // Open starts the service. func (s *Service) Open() error { - s.Logger.Info("Starting HTTP service") - s.Logger.Info(fmt.Sprint("Authentication enabled:", s.Handler.Config.AuthEnabled)) + s.Logger.Info("Starting HTTP service", zap.Bool("authentication", s.Handler.Config.AuthEnabled)) // Open listener. if s.https { @@ -103,7 +102,6 @@ func (s *Service) Open() error { return err } - s.Logger.Info(fmt.Sprint("Listening on HTTPS:", listener.Addr().String())) s.ln = listener } else { listener, err := net.Listen("tcp", s.addr) @@ -111,9 +109,11 @@ func (s *Service) Open() error { return err } - s.Logger.Info(fmt.Sprint("Listening on HTTP:", listener.Addr().String())) s.ln = listener } + s.Logger.Info("Listening on HTTP", + zap.Stringer("addr", s.ln.Addr()), + zap.Bool("https", s.https)) // Open unix socket listener. if s.unixSocket { @@ -132,7 +132,8 @@ func (s *Service) Open() error { return err } - s.Logger.Info(fmt.Sprint("Listening on unix socket:", listener.Addr().String())) + s.Logger.Info("Listening on unix socket", + zap.Stringer("addr", listener.Addr())) s.unixSocketListener = listener go s.serveUnixSocket() diff --git a/services/meta/client.go b/services/meta/client.go index 26935c61dc..7bc4f03256 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -7,7 +7,6 @@ import ( crand "crypto/rand" "crypto/sha256" "errors" - "fmt" "io" "io/ioutil" "math/rand" @@ -800,16 +799,23 @@ func (c *Client) PrecreateShardGroups(from, to time.Time) error { nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond) // if it already exists, continue if sg, _ := data.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); sg != nil { - c.logger.Info(fmt.Sprintf("shard group %d exists for database %s, retention policy %s", sg.ID, di.Name, rp.Name)) + c.logger.Info("Shard group already exists", + zap.Uint64("id", sg.ID), + zap.String("db", di.Name), + zap.String("rp", rp.Name)) continue } newGroup, err := createShardGroup(data, di.Name, rp.Name, nextShardGroupTime) if err != nil { - c.logger.Info(fmt.Sprintf("failed to precreate successive shard group for group %d: %s", g.ID, err.Error())) + c.logger.Info("Failed to precreate successive shard group", + zap.Uint64("group_id", g.ID), zap.Error(err)) continue } changed = true - c.logger.Info(fmt.Sprintf("new shard group %d successfully precreated for database %s, retention policy %s", newGroup.ID, di.Name, rp.Name)) + c.logger.Info("New shard group successfully precreated", + zap.Uint64("group_id", newGroup.ID), + zap.String("db", di.Name), + zap.String("rp", rp.Name)) } } } diff --git a/services/opentsdb/handler.go b/services/opentsdb/handler.go index f41a6fb23c..d6726121dd 100644 --- a/services/opentsdb/handler.go +++ b/services/opentsdb/handler.go @@ -5,7 +5,6 @@ import ( "compress/gzip" "encoding/json" "errors" - "fmt" "io" "net" "net/http" @@ -116,7 +115,7 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) { pt, err := models.NewPoint(p.Metric, models.NewTags(p.Tags), map[string]interface{}{"value": p.Value}, ts) if err != nil { - h.Logger.Info(fmt.Sprintf("Dropping point %v: %v", p.Metric, err)) + h.Logger.Info("Dropping point", zap.String("name", p.Metric), zap.Error(err)) if h.stats != nil { atomic.AddInt64(&h.stats.InvalidDroppedPoints, 1) } @@ -127,11 +126,11 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) { // Write points. if err := h.PointsWriter.WritePointsPrivileged(h.Database, h.RetentionPolicy, models.ConsistencyLevelAny, points); influxdb.IsClientError(err) { - h.Logger.Info(fmt.Sprint("write series error: ", err)) + h.Logger.Info("Write series error", zap.Error(err)) http.Error(w, "write series error: "+err.Error(), http.StatusBadRequest) return } else if err != nil { - h.Logger.Info(fmt.Sprint("write series error: ", err)) + h.Logger.Info("Write series error", zap.Error(err)) http.Error(w, "write series error: "+err.Error(), http.StatusInternalServerError) return } diff --git a/services/opentsdb/service.go b/services/opentsdb/service.go index 05a4d9faae..73c979af27 100644 --- a/services/opentsdb/service.go +++ b/services/opentsdb/service.go @@ -5,7 +5,6 @@ import ( "bufio" "bytes" "crypto/tls" - "fmt" "io" "net" "net/http" @@ -134,7 +133,6 @@ func (s *Service) Open() error { return err } - s.Logger.Info(fmt.Sprint("Listening on TLS: ", listener.Addr().String())) s.ln = listener } else { listener, err := net.Listen("tcp", s.BindAddress) @@ -142,9 +140,11 @@ func (s *Service) Open() error { return err } - s.Logger.Info(fmt.Sprint("Listening on: ", listener.Addr().String())) s.ln = listener } + s.Logger.Info("Listening on TCP", + zap.Stringer("addr", s.ln.Addr()), + zap.Bool("tls", s.tls)) s.httpln = newChanListener(s.ln.Addr()) // Begin listening for connections. @@ -294,10 +294,10 @@ func (s *Service) serve() { // Wait for next connection. conn, err := s.ln.Accept() if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() { - s.Logger.Info("openTSDB TCP listener closed") + s.Logger.Info("OpenTSDB TCP listener closed") return } else if err != nil { - s.Logger.Info(fmt.Sprint("error accepting openTSDB: ", err.Error())) + s.Logger.Info("Error accepting OpenTSDB", zap.Error(err)) continue } @@ -355,7 +355,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) { if err != nil { if err != io.EOF { atomic.AddInt64(&s.stats.TelnetReadError, 1) - s.Logger.Info(fmt.Sprint("error reading from openTSDB connection ", err.Error())) + s.Logger.Info("Error reading from OpenTSDB connection", zap.Error(err)) } return } @@ -372,7 +372,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) { if len(inputStrs) < 4 || inputStrs[0] != "put" { atomic.AddInt64(&s.stats.TelnetBadLine, 1) if s.LogPointErrors { - s.Logger.Info(fmt.Sprintf("malformed line '%s' from %s", line, remoteAddr)) + s.Logger.Info("Malformed line", zap.String("line", line), zap.String("remote_addr", remoteAddr)) } continue } @@ -387,7 +387,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) { if err != nil { atomic.AddInt64(&s.stats.TelnetBadTime, 1) if s.LogPointErrors { - s.Logger.Info(fmt.Sprintf("malformed time '%s' from %s", tsStr, remoteAddr)) + s.Logger.Info("Malformed time", zap.String("time", tsStr), zap.String("remote_addr", remoteAddr)) } } @@ -399,7 +399,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) { default: atomic.AddInt64(&s.stats.TelnetBadTime, 1) if s.LogPointErrors { - s.Logger.Info(fmt.Sprintf("bad time '%s' must be 10 or 13 chars, from %s ", tsStr, remoteAddr)) + s.Logger.Info("Time must be 10 or 13 chars", zap.String("time", tsStr), zap.String("remote_addr", remoteAddr)) } continue } @@ -410,7 +410,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) { if len(parts) != 2 || parts[0] == "" || parts[1] == "" { atomic.AddInt64(&s.stats.TelnetBadTag, 1) if s.LogPointErrors { - s.Logger.Info(fmt.Sprintf("malformed tag data '%v' from %s", tagStrs[t], remoteAddr)) + s.Logger.Info("Malformed tag data", zap.String("tag", tagStrs[t]), zap.String("remote_addr", remoteAddr)) } continue } @@ -424,7 +424,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) { if err != nil { atomic.AddInt64(&s.stats.TelnetBadFloat, 1) if s.LogPointErrors { - s.Logger.Info(fmt.Sprintf("bad float '%s' from %s", valueStr, remoteAddr)) + s.Logger.Info("Bad float", zap.String("value", valueStr), zap.String("remote_addr", remoteAddr)) } continue } @@ -434,7 +434,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) { if err != nil { atomic.AddInt64(&s.stats.TelnetBadFloat, 1) if s.LogPointErrors { - s.Logger.Info(fmt.Sprintf("bad float '%s' from %s", valueStr, remoteAddr)) + s.Logger.Info("Bad float", zap.String("value", valueStr), zap.String("remote_addr", remoteAddr)) } continue } @@ -464,7 +464,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) { case batch := <-batcher.Out(): // Will attempt to create database if not yet created. if err := s.createInternalStorage(); err != nil { - s.Logger.Info(fmt.Sprintf("Required database %s does not yet exist: %s", s.Database, err.Error())) + s.Logger.Info("Required database does not yet exist", zap.String("db", s.Database), zap.Error(err)) continue } @@ -472,7 +472,8 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) { atomic.AddInt64(&s.stats.BatchesTransmitted, 1) atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) } else { - s.Logger.Info(fmt.Sprintf("failed to write point batch to database %q: %s", s.Database, err)) + s.Logger.Info("Failed to write point batch to database", + zap.String("db", s.Database), zap.Error(err)) atomic.AddInt64(&s.stats.BatchesTransmitFail, 1) } } diff --git a/services/precreator/service.go b/services/precreator/service.go index deab6029ed..b9b28d2c81 100644 --- a/services/precreator/service.go +++ b/services/precreator/service.go @@ -2,10 +2,10 @@ package precreator // import "github.com/influxdata/influxdb/services/precreator" import ( - "fmt" "sync" "time" + "github.com/influxdata/influxdb/logger" "go.uber.org/zap" ) @@ -44,8 +44,9 @@ func (s *Service) Open() error { return nil } - s.Logger.Info(fmt.Sprintf("Starting precreation service with check interval of %s, advance period of %s", - s.checkInterval, s.advancePeriod)) + s.Logger.Info("Starting precreation service", + logger.DurationLiteral("check_interval", s.checkInterval), + logger.DurationLiteral("advance_period", s.advancePeriod)) s.done = make(chan struct{}) @@ -75,10 +76,10 @@ func (s *Service) runPrecreation() { select { case <-time.After(s.checkInterval): if err := s.precreate(time.Now().UTC()); err != nil { - s.Logger.Info(fmt.Sprintf("failed to precreate shards: %s", err.Error())) + s.Logger.Info("Failed to precreate shards", zap.Error(err)) } case <-s.done: - s.Logger.Info("Precreation service terminating") + s.Logger.Info("Terminating precreation service") return } } diff --git a/services/retention/service.go b/services/retention/service.go index 9c6e9f0fd5..0b191296aa 100644 --- a/services/retention/service.go +++ b/services/retention/service.go @@ -2,10 +2,10 @@ package retention // import "github.com/influxdata/influxdb/services/retention" import ( - "fmt" "sync" "time" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/services/meta" "go.uber.org/zap" ) @@ -43,7 +43,8 @@ func (s *Service) Open() error { return nil } - s.logger.Info("Starting retention policy enforcement service", zap.String("check-interval", s.config.CheckInterval.String())) + s.logger.Info("Starting retention policy enforcement service", + logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval))) s.done = make(chan struct{}) s.wg.Add(1) @@ -57,7 +58,7 @@ func (s *Service) Close() error { return nil } - s.logger.Info("Retention policy enforcement service closing.") + s.logger.Info("Closing retention policy enforcement service") close(s.done) s.wg.Wait() @@ -79,7 +80,7 @@ func (s *Service) run() { return case <-ticker.C: - s.logger.Info("Retention policy shard deletion check commencing.") + s.logger.Info("Shard deletion check commencing") type deletionInfo struct { db string @@ -87,16 +88,29 @@ func (s *Service) run() { } deletedShardIDs := make(map[uint64]deletionInfo) + // Mark down if an error occurred during this function so we can inform the + // user that we will try again on the next interval. + // Without the message, they may see the error message and assume they + // have to do it manually. + var retryNeeded bool dbs := s.MetaClient.Databases() for _, d := range dbs { for _, r := range d.RetentionPolicies { for _, g := range r.ExpiredShardGroups(time.Now().UTC()) { if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { - s.logger.Info(fmt.Sprintf("Failed to delete shard group %d from database %s, retention policy %s: %v. Retry in %v.", g.ID, d.Name, r.Name, err, s.config.CheckInterval)) + s.logger.Info("Failed to delete shard group", + zap.Uint64("id", g.ID), + zap.String("db", d.Name), + zap.String("rp", r.Name), + zap.Error(err)) + retryNeeded = true continue } - s.logger.Info(fmt.Sprintf("Deleted shard group %d from database %s, retention policy %s.", g.ID, d.Name, r.Name)) + s.logger.Info("Deleted shard group", + zap.Uint64("id", g.ID), + zap.String("db", d.Name), + zap.String("rp", r.Name)) // Store all the shard IDs that may possibly need to be removed locally. for _, sh := range g.Shards { @@ -110,15 +124,28 @@ func (s *Service) run() { for _, id := range s.TSDBStore.ShardIDs() { if info, ok := deletedShardIDs[id]; ok { if err := s.TSDBStore.DeleteShard(id); err != nil { - s.logger.Error(fmt.Sprintf("Failed to delete shard ID %d from database %s, retention policy %s: %v. Will retry in %v", id, info.db, info.rp, err, s.config.CheckInterval)) + s.logger.Info("Failed to delete shard", + zap.Uint64("id", id), + zap.String("db", info.db), + zap.String("rp", info.rp), + zap.Error(err)) + retryNeeded = true continue } - s.logger.Info(fmt.Sprintf("Shard ID %d from database %s, retention policy %s, deleted.", id, info.db, info.rp)) + s.logger.Info("Deleted shard", + zap.Uint64("id", id), + zap.String("db", info.db), + zap.String("rp", info.rp)) } } if err := s.MetaClient.PruneShardGroups(); err != nil { - s.logger.Info(fmt.Sprintf("Problem pruning shard groups: %s. Will retry in %v", err, s.config.CheckInterval)) + s.logger.Info("Problem pruning shard groups", zap.Error(err)) + retryNeeded = true + } + + if retryNeeded { + s.logger.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval))) } } } diff --git a/services/snapshotter/service.go b/services/snapshotter/service.go index d95960ef48..481d33160e 100644 --- a/services/snapshotter/service.go +++ b/services/snapshotter/service.go @@ -93,10 +93,10 @@ func (s *Service) serve() { // Wait for next connection. conn, err := s.Listener.Accept() if err != nil && strings.Contains(err.Error(), "connection closed") { - s.Logger.Info("snapshot listener closed") + s.Logger.Info("Listener closed") return } else if err != nil { - s.Logger.Info(fmt.Sprint("error accepting snapshot request: ", err.Error())) + s.Logger.Info("Error accepting snapshot request", zap.Error(err)) continue } diff --git a/services/subscriber/service.go b/services/subscriber/service.go index 034c9a244e..74afc153cd 100644 --- a/services/subscriber/service.go +++ b/services/subscriber/service.go @@ -101,7 +101,7 @@ func (s *Service) Open() error { s.waitForMetaUpdates() }() - s.Logger.Info("opened service") + s.Logger.Info("Opened service") return nil } @@ -121,7 +121,7 @@ func (s *Service) Close() error { close(s.closing) s.wg.Wait() - s.Logger.Info("closed service") + s.Logger.Info("Closed service") return nil } @@ -165,7 +165,7 @@ func (s *Service) waitForMetaUpdates() { case <-ch: err := s.Update() if err != nil { - s.Logger.Info(fmt.Sprint("error updating subscriptions: ", err)) + s.Logger.Info("Error updating subscriptions", zap.Error(err)) } case <-s.closing: return @@ -296,7 +296,7 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) { sub, err := s.createSubscription(se, si.Mode, si.Destinations) if err != nil { atomic.AddInt64(&s.stats.CreateFailures, 1) - s.Logger.Info(fmt.Sprintf("Subscription creation failed for '%s' with error: %s", si.Name, err)) + s.Logger.Info("Subscription creation failed", zap.String("name", si.Name), zap.Error(err)) continue } cw := chanWriter{ @@ -314,7 +314,9 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) { }() } s.subs[se] = cw - s.Logger.Info(fmt.Sprintf("added new subscription for %s %s", se.db, se.rp)) + s.Logger.Info("Added new subscription", + zap.String("db", se.db), + zap.String("rp", se.rp)) } } } @@ -327,7 +329,9 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) { // Remove it from the set delete(s.subs, se) - s.Logger.Info(fmt.Sprintf("deleted old subscription for %s %s", se.db, se.rp)) + s.Logger.Info("Deleted old subscription", + zap.String("db", se.db), + zap.String("rp", se.rp)) } } } @@ -341,7 +345,7 @@ func (s *Service) newPointsWriter(u url.URL) (PointsWriter, error) { return NewHTTP(u.String(), time.Duration(s.conf.HTTPTimeout)) case "https": if s.conf.InsecureSkipVerify { - s.Logger.Info("WARNING: 'insecure-skip-verify' is true. This will skip all certificate verifications.") + s.Logger.Warn("'insecure-skip-verify' is true. This will skip all certificate verifications.") } return NewHTTPS(u.String(), time.Duration(s.conf.HTTPTimeout), s.conf.InsecureSkipVerify, s.conf.CaCerts) default: diff --git a/services/udp/service.go b/services/udp/service.go index e950574d8c..69e7bad9a0 100644 --- a/services/udp/service.go +++ b/services/udp/service.go @@ -3,7 +3,6 @@ package udp // import "github.com/influxdata/influxdb/services/udp" import ( "errors" - "fmt" "net" "sync" "sync/atomic" @@ -92,28 +91,30 @@ func (s *Service) Open() (err error) { s.addr, err = net.ResolveUDPAddr("udp", s.config.BindAddress) if err != nil { - s.Logger.Info(fmt.Sprintf("Failed to resolve UDP address %s: %s", s.config.BindAddress, err)) + s.Logger.Info("Failed to resolve UDP address", + zap.String("bind_address", s.config.BindAddress), zap.Error(err)) return err } s.conn, err = net.ListenUDP("udp", s.addr) if err != nil { - s.Logger.Info(fmt.Sprintf("Failed to set up UDP listener at address %s: %s", s.addr, err)) + s.Logger.Info("Failed to set up UDP listener", + zap.Stringer("addr", s.addr), zap.Error(err)) return err } if s.config.ReadBuffer != 0 { err = s.conn.SetReadBuffer(s.config.ReadBuffer) if err != nil { - s.Logger.Info(fmt.Sprintf("Failed to set UDP read buffer to %d: %s", - s.config.ReadBuffer, err)) + s.Logger.Info("Failed to set UDP read buffer", + zap.Int("buffer_size", s.config.ReadBuffer), zap.Error(err)) return err } } s.batcher = tsdb.NewPointBatcher(s.config.BatchSize, s.config.BatchPending, time.Duration(s.config.BatchTimeout)) s.batcher.Start() - s.Logger.Info(fmt.Sprintf("Started listening on UDP: %s", s.config.BindAddress)) + s.Logger.Info("Started listening on UDP", zap.String("addr", s.config.BindAddress)) s.wg.Add(3) go s.serve() @@ -159,7 +160,8 @@ func (s *Service) writer() { case batch := <-s.batcher.Out(): // Will attempt to create database if not yet created. if err := s.createInternalStorage(); err != nil { - s.Logger.Info(fmt.Sprintf("Required database %s does not yet exist: %s", s.config.Database, err.Error())) + s.Logger.Info("Required database does not yet exist", + zap.String("db", s.config.Database), zap.Error(err)) continue } @@ -167,7 +169,8 @@ func (s *Service) writer() { atomic.AddInt64(&s.stats.BatchesTransmitted, 1) atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) } else { - s.Logger.Info(fmt.Sprintf("failed to write point batch to database %q: %s", s.config.Database, err)) + s.Logger.Info("Failed to write point batch to database", + zap.String("db", s.config.Database), zap.Error(err)) atomic.AddInt64(&s.stats.BatchesTransmitFail, 1) } @@ -191,7 +194,7 @@ func (s *Service) serve() { n, _, err := s.conn.ReadFromUDP(buf) if err != nil { atomic.AddInt64(&s.stats.ReadFail, 1) - s.Logger.Info(fmt.Sprintf("Failed to read UDP message: %s", err)) + s.Logger.Info("Failed to read UDP message", zap.Error(err)) continue } atomic.AddInt64(&s.stats.BytesReceived, int64(n)) @@ -214,7 +217,7 @@ func (s *Service) parser() { points, err := models.ParsePointsWithPrecision(buf, time.Now().UTC(), s.config.Precision) if err != nil { atomic.AddInt64(&s.stats.PointsParseFail, 1) - s.Logger.Info(fmt.Sprintf("Failed to parse points: %s", err)) + s.Logger.Info("Failed to parse points", zap.Error(err)) continue } diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index ef5e20368a..8525ddfc46 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -680,7 +680,7 @@ func (cl *CacheLoader) Load(cache *Cache) error { if err != nil { return err } - cl.Logger.Info(fmt.Sprintf("reading file %s, size %d", f.Name(), stat.Size())) + cl.Logger.Info("Reading file", zap.String("path", f.Name()), zap.Int64("size", stat.Size())) // Nothing to read, skip it if stat.Size() == 0 { @@ -698,7 +698,7 @@ func (cl *CacheLoader) Load(cache *Cache) error { entry, err := r.Read() if err != nil { n := r.Count() - cl.Logger.Info(fmt.Sprintf("file %s corrupt at position %d, truncating", f.Name(), n)) + cl.Logger.Info("File corrupt", zap.String("path", f.Name()), zap.Int64("pos", n)) if err := f.Truncate(n); err != nil { return err } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 1747a507ec..a349266f0a 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -742,7 +742,8 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error { if err := e.Cache.ApplyEntryFn(func(key []byte, entry *entry) error { fieldType, err := entry.values.InfluxQLType() if err != nil { - e.logger.Info(fmt.Sprintf("error getting the data type of values for key %s: %s", key, err.Error())) + e.logger.Info("Error getting the data type of values for key", + zap.ByteString("key", key), zap.Error(err)) } if err := e.addToIndexFromKey(key, fieldType); err != nil { @@ -758,7 +759,8 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error { return err } - e.traceLogger.Info(fmt.Sprintf("Meta data index for shard %d loaded in %v", shardID, time.Since(now))) + e.traceLogger.Info("Meta data index for shard loaded", + zap.Uint64("id", shardID), zap.Duration("duration", time.Since(now))) return nil } @@ -1561,7 +1563,9 @@ func (e *Engine) WriteSnapshot() error { defer func() { if started != nil { e.Cache.UpdateCompactTime(time.Since(*started)) - e.logger.Info(fmt.Sprintf("Snapshot for path %s written in %v", e.path, time.Since(*started))) + e.logger.Info("Snapshot for path written", + zap.String("path", e.path), + zap.Duration("duration", time.Since(*started))) } }() @@ -1603,7 +1607,9 @@ func (e *Engine) WriteSnapshot() error { // holding the engine write lock. dedup := time.Now() snapshot.Deduplicate() - e.traceLogger.Info(fmt.Sprintf("Snapshot for path %s deduplicated in %v", e.path, time.Since(dedup))) + e.traceLogger.Info("Snapshot for path deduplicated", + zap.String("path", e.path), + zap.Duration("duration", time.Since(dedup))) return e.writeSnapshotAndCommit(closedFiles, snapshot) } @@ -1637,7 +1643,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) ( // write the new snapshot files newFiles, err := e.Compactor.WriteSnapshot(snapshot) if err != nil { - e.logger.Info(fmt.Sprintf("error writing snapshot from compactor: %v", err)) + e.logger.Info("Error writing snapshot from compactor", zap.Error(err)) return err } @@ -1646,7 +1652,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) ( // update the file store with these new files if err := e.FileStore.Replace(nil, newFiles); err != nil { - e.logger.Info(fmt.Sprintf("error adding new TSM files from snapshot: %v", err)) + e.logger.Info("Error adding new TSM files from snapshot", zap.Error(err)) return err } @@ -1654,7 +1660,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) ( e.Cache.ClearSnapshot(true) if err := e.WAL.Remove(closedFiles); err != nil { - e.logger.Info(fmt.Sprintf("error removing closed wal segments: %v", err)) + e.logger.Info("Error removing closed WAL segments", zap.Error(err)) } return nil @@ -1677,10 +1683,10 @@ func (e *Engine) compactCache() { e.Cache.UpdateAge() if e.ShouldCompactCache(e.WAL.LastWriteTime()) { start := time.Now() - e.traceLogger.Info(fmt.Sprintf("Compacting cache for %s", e.path)) + e.traceLogger.Info("Compacting cache", zap.String("path", e.path)) err := e.WriteSnapshot() if err != nil && err != errCompactionsDisabled { - e.logger.Info(fmt.Sprintf("error writing snapshot: %v", err)) + e.logger.Info("Error writing snapshot", zap.Error(err)) atomic.AddInt64(&e.stats.CacheCompactionErrors, 1) } else { atomic.AddInt64(&e.stats.CacheCompactions, 1) @@ -1881,9 +1887,8 @@ func (e *Engine) compactFull(grp CompactionGroup, wg *sync.WaitGroup) bool { type compactionStrategy struct { group CompactionGroup - fast bool - description string - level int + fast bool + level int durationStat *int64 activeStat *int64 @@ -1908,9 +1913,9 @@ func (s *compactionStrategy) Apply() { func (s *compactionStrategy) compactGroup() { group := s.group start := time.Now() - s.logger.Info(fmt.Sprintf("beginning %s compaction, %d TSM files", s.description, len(group))) + s.logger.Info("Beginning compaction", zap.Int("files", len(group))) for i, f := range group { - s.logger.Info(fmt.Sprintf("compacting %s %s (#%d)", s.description, f, i)) + s.logger.Info("Compacting file", zap.Int("index", i), zap.String("file", f)) } var ( @@ -1927,7 +1932,7 @@ func (s *compactionStrategy) compactGroup() { if err != nil { _, inProgress := err.(errCompactionInProgress) if err == errCompactionsDisabled || inProgress { - s.logger.Info(fmt.Sprintf("aborted %s compaction. %v", s.description, err)) + s.logger.Info("Aborted compaction", zap.Error(err)) if _, ok := err.(errCompactionInProgress); ok { time.Sleep(time.Second) @@ -1935,23 +1940,26 @@ func (s *compactionStrategy) compactGroup() { return } - s.logger.Info(fmt.Sprintf("error compacting TSM files: %v", err)) + s.logger.Info("Error compacting TSM files", zap.Error(err)) atomic.AddInt64(s.errorStat, 1) time.Sleep(time.Second) return } if err := s.fileStore.ReplaceWithCallback(group, files, nil); err != nil { - s.logger.Info(fmt.Sprintf("error replacing new TSM files: %v", err)) + s.logger.Info("Error replacing new TSM files", zap.Error(err)) atomic.AddInt64(s.errorStat, 1) time.Sleep(time.Second) return } for i, f := range files { - s.logger.Info(fmt.Sprintf("compacted %s into %s (#%d)", s.description, f, i)) + s.logger.Info("Compacted file", zap.Int("index", i), zap.String("file", f)) } - s.logger.Info(fmt.Sprintf("compacted %s %d files into %d files in %s", s.description, len(group), len(files), time.Since(start))) + s.logger.Info("Finished compacting files", + zap.Int("groups", len(group)), + zap.Int("files", len(files)), + zap.Duration("duration", time.Since(start))) atomic.AddInt64(s.successStat, 1) } @@ -1960,14 +1968,13 @@ func (s *compactionStrategy) compactGroup() { func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level int) *compactionStrategy { return &compactionStrategy{ group: group, - logger: e.logger, + logger: e.logger.With(zap.Int("level", level), zap.String("strategy", "level")), fileStore: e.FileStore, compactor: e.Compactor, fast: fast, engine: e, level: level, - description: fmt.Sprintf("level %d", level), activeStat: &e.stats.TSMCompactionsActive[level-1], successStat: &e.stats.TSMCompactions[level-1], errorStat: &e.stats.TSMCompactionErrors[level-1], @@ -1980,7 +1987,7 @@ func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level func (e *Engine) fullCompactionStrategy(group CompactionGroup, optimize bool) *compactionStrategy { s := &compactionStrategy{ group: group, - logger: e.logger, + logger: e.logger.With(zap.String("strategy", "full"), zap.Bool("optimize", optimize)), fileStore: e.FileStore, compactor: e.Compactor, fast: optimize, @@ -1989,13 +1996,11 @@ func (e *Engine) fullCompactionStrategy(group CompactionGroup, optimize bool) *c } if optimize { - s.description = "optimize" s.activeStat = &e.stats.TSMOptimizeCompactionsActive s.successStat = &e.stats.TSMOptimizeCompactions s.errorStat = &e.stats.TSMOptimizeCompactionErrors s.durationStat = &e.stats.TSMOptimizeCompactionDuration } else { - s.description = "full" s.activeStat = &e.stats.TSMFullCompactionsActive s.successStat = &e.stats.TSMFullCompactions s.errorStat = &e.stats.TSMFullCompactionErrors @@ -2027,7 +2032,8 @@ func (e *Engine) reloadCache() error { return err } - e.traceLogger.Info(fmt.Sprintf("Reloaded WAL cache %s in %v", e.WAL.Path(), time.Since(now))) + e.traceLogger.Info("Reloaded WAL cache", + zap.String("path", e.WAL.Path()), zap.Duration("duration", time.Since(now))) return nil } diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 52795fcd4d..09d04a0fc2 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -483,7 +483,10 @@ func (f *FileStore) Open() error { go func(idx int, file *os.File) { start := time.Now() df, err := NewTSMReader(file) - f.logger.Info(fmt.Sprintf("%s (#%d) opened in %v", file.Name(), idx, time.Since(start))) + f.logger.Info("Opened file", + zap.String("path", file.Name()), + zap.Int("id", idx), + zap.Duration("duration", time.Since(start))) if err != nil { readerC <- &res{r: df, err: fmt.Errorf("error opening memory map for file %s: %v", file.Name(), err)} @@ -926,7 +929,7 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location { // CreateSnapshot creates hardlinks for all tsm and tombstone files // in the path provided. func (f *FileStore) CreateSnapshot() (string, error) { - f.traceLogger.Info(fmt.Sprintf("Creating snapshot in %s", f.dir)) + f.traceLogger.Info("Creating snapshot", zap.String("dir", f.dir)) files := f.Files() f.mu.Lock() @@ -1275,12 +1278,12 @@ func (p *purger) purge() { for k, v := range p.files { if !v.InUse() { if err := v.Close(); err != nil { - p.logger.Info(fmt.Sprintf("purge: close file: %v", err)) + p.logger.Info("Purge: close file", zap.Error(err)) continue } if err := v.Remove(); err != nil { - p.logger.Info(fmt.Sprintf("purge: remove file: %v", err)) + p.logger.Info("Purge: remove file", zap.Error(err)) continue } delete(p.files, k) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 7b50e0207e..f50b8b8ba4 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -184,8 +184,8 @@ func (l *WAL) Open() error { l.mu.Lock() defer l.mu.Unlock() - l.traceLogger.Info(fmt.Sprintf("tsm1 WAL starting with %d segment size", l.SegmentSize)) - l.traceLogger.Info(fmt.Sprintf("tsm1 WAL writing to %s", l.path)) + l.traceLogger.Info("tsm1 WAL starting", zap.Int("segment_size", l.SegmentSize)) + l.traceLogger.Info("tsm1 WAL writing", zap.String("path", l.path)) if err := os.MkdirAll(l.path, 0777); err != nil { return err @@ -350,7 +350,7 @@ func (l *WAL) Remove(files []string) error { l.mu.Lock() defer l.mu.Unlock() for _, fn := range files { - l.traceLogger.Info(fmt.Sprintf("Removing %s", fn)) + l.traceLogger.Info("Removing WAL file", zap.String("path", fn)) os.RemoveAll(fn) } @@ -523,7 +523,7 @@ func (l *WAL) Close() error { l.once.Do(func() { // Close, but don't set to nil so future goroutines can still be signaled - l.traceLogger.Info(fmt.Sprintf("Closing %s", l.path)) + l.traceLogger.Info("Closing WAL file", zap.String("path", l.path)) close(l.closing) if l.currentSegmentWriter != nil { diff --git a/tsdb/store.go b/tsdb/store.go index 05df418313..44cf0c0f04 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -100,13 +100,13 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic { for _, database := range databases { sc, err := s.SeriesCardinality(database) if err != nil { - s.Logger.Error("cannot retrieve series cardinality", zap.Error(err)) + s.Logger.Info("Cannot retrieve series cardinality", zap.Error(err)) continue } mc, err := s.MeasurementsCardinality(database) if err != nil { - s.Logger.Error("cannot retrieve measurement cardinality", zap.Error(err)) + s.Logger.Info("Cannot retrieve measurement cardinality", zap.Error(err)) continue } @@ -144,7 +144,7 @@ func (s *Store) Open() error { s.closing = make(chan struct{}) s.shards = map[uint64]*Shard{} - s.Logger.Info(fmt.Sprintf("Using data dir: %v", s.Path())) + s.Logger.Info("Using data dir", zap.String("path", s.Path())) // Create directory. if err := os.MkdirAll(s.path, 0777); err != nil { @@ -210,7 +210,7 @@ func (s *Store) loadShards() error { for _, db := range dbDirs { if !db.IsDir() { - s.Logger.Info("Not loading. Not a database directory.", zap.String("name", db.Name())) + s.Logger.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory")) continue } @@ -234,7 +234,7 @@ func (s *Store) loadShards() error { for _, rp := range rpDirs { if !rp.IsDir() { - s.Logger.Info(fmt.Sprintf("Skipping retention policy dir: %s. Not a directory", rp.Name())) + s.Logger.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory")) continue } @@ -291,7 +291,7 @@ func (s *Store) loadShards() error { } resC <- &res{s: shard} - s.Logger.Info(fmt.Sprintf("%s opened in %s", path, time.Since(start))) + s.Logger.Info("Opened shard", zap.String("path", path), zap.Duration("duration", time.Since(start))) }(db.Name(), rp.Name(), sh.Name()) } } @@ -1652,7 +1652,7 @@ func (s *Store) monitorShards() { for _, sh := range s.shards { if sh.IsIdle() { if err := sh.Free(); err != nil { - s.Logger.Warn("error free cold shard resources:", zap.Error(err)) + s.Logger.Warn("Error while freeing cold shard resources", zap.Error(err)) } } else { sh.SetCompactionsEnabled(true) @@ -1710,7 +1710,7 @@ func (s *Store) monitorShards() { indexSet := IndexSet{Indexes: []Index{firstShardIndex}, SeriesFile: sfile} names, err := indexSet.MeasurementNamesByExpr(nil, nil) if err != nil { - s.Logger.Warn("cannot retrieve measurement names", zap.Error(err)) + s.Logger.Warn("Cannot retrieve measurement names", zap.Error(err)) return nil } @@ -1725,8 +1725,13 @@ func (s *Store) monitorShards() { // Log at 80, 85, 90-100% levels if perc == 80 || perc == 85 || perc >= 90 { - s.Logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s measurement=%s tag=%s", - perc, n, s.EngineOptions.Config.MaxValuesPerTag, db, name, k)) + s.Logger.Warn("max-values-per-tag limit may be exceeded soon", + zap.String("perc", fmt.Sprintf("%d%%", perc)), + zap.Int("n", n), + zap.Int("max", s.EngineOptions.Config.MaxValuesPerTag), + zap.String("db", db), + zap.ByteString("measurement", name), + zap.ByteString("tag", k)) } return nil })