Switch logging to use structured logging everywhere

The logging library has been switched to use uber-go/zap. While the
logging has been changed to use structured logging, this commit does not
change any of the logging statements to take advantage of the new
structured log or new log levels. Those changes will come in future
commits.
pull/7671/head
Jonathan A. Sternberg 2016-12-01 12:26:23 -06:00
parent 855c567c67
commit 21502a39e8
40 changed files with 417 additions and 465 deletions

View File

@ -15,6 +15,7 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco
- [#7601](https://github.com/influxdata/influxdb/issues/7601): Prune data in meta store for deleted shards.
- [#7669](https://github.com/influxdata/influxdb/issues/7669): Uncomment section headers from the default configuration file.
- [#7684](https://github.com/influxdata/influxdb/issues/7684): Update Go version to 1.7.4.
- [#7036](https://github.com/influxdata/influxdb/issues/7036): Switch logging to use structured logging everywhere.
### Bugfixes

2
Godeps
View File

@ -14,4 +14,6 @@ github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447
github.com/peterh/liner 8975875355a81d612fafb9f5a6037bdcc2d9b073
github.com/rakyll/statik e383bbf6b2ec1a2fb8492dfd152d945fb88919b6
github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d
github.com/uber-go/atomic 9e99152552a6ce13fa3b2ce4a9c4fb117cca4506
github.com/uber-go/zap fbae0281ffd546fa6d1959fec6075ac5da7fb577
golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd

View File

@ -16,6 +16,8 @@
- github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING)
- github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE)
- github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE)
- github.com/uber-go/atomic [MIT LICENSE](https://github.com/uber-go/atomic/blob/master/LICENSE.txt)
- github.com/uber-go/zap [MIT LICENSE](https://github.com/uber-go/zap/blob/master/LICENSE.txt)
- glyphicons [LICENSE](http://glyphicons.com/license/)
- golang.org/x/crypto [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)
- jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt)

View File

@ -4,7 +4,6 @@ import (
"flag"
"fmt"
"io"
"log"
"math/rand"
"os"
"os/signal"
@ -16,6 +15,7 @@ import (
"github.com/influxdata/influxdb/cmd/influxd/help"
"github.com/influxdata/influxdb/cmd/influxd/restore"
"github.com/influxdata/influxdb/cmd/influxd/run"
"github.com/uber-go/zap"
)
// These variables are populated via the Go linker.
@ -50,7 +50,7 @@ func main() {
// Main represents the program execution.
type Main struct {
Logger *log.Logger
Logger zap.Logger
Stdin io.Reader
Stdout io.Writer
@ -60,7 +60,10 @@ type Main struct {
// NewMain return a new instance of Main.
func NewMain() *Main {
return &Main{
Logger: log.New(os.Stderr, "[run] ", log.LstdFlags),
Logger: zap.New(
zap.NewTextEncoder(),
zap.Output(os.Stderr),
),
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
@ -80,6 +83,7 @@ func (m *Main) Run(args ...string) error {
cmd.Version = version
cmd.Commit = commit
cmd.Branch = branch
cmd.Logger = m.Logger
if err := cmd.Run(args...); err != nil {
return fmt.Errorf("run: %s", err)
@ -87,12 +91,12 @@ func (m *Main) Run(args ...string) error {
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
m.Logger.Println("Listening for signals")
m.Logger.Info("Listening for signals")
// Block until one of the signals above is received
select {
case <-signalCh:
m.Logger.Println("Signal received, initializing clean shutdown...")
m.Logger.Info("Signal received, initializing clean shutdown...")
go func() {
cmd.Close()
}()
@ -100,14 +104,14 @@ func (m *Main) Run(args ...string) error {
// Block again until another signal is received, a shutdown timeout elapses,
// or the Command is gracefully closed
m.Logger.Println("Waiting for clean shutdown...")
m.Logger.Info("Waiting for clean shutdown...")
select {
case <-signalCh:
m.Logger.Println("second signal received, initializing hard shutdown")
m.Logger.Info("second signal received, initializing hard shutdown")
case <-time.After(time.Second * 30):
m.Logger.Println("time limit reached, initializing hard shutdown")
m.Logger.Info("time limit reached, initializing hard shutdown")
case <-cmd.Closed:
m.Logger.Println("server shutdown completed")
m.Logger.Info("server shutdown completed")
}
// goodbye.

View File

@ -182,7 +182,6 @@ func (cmd *Command) unpackMeta() error {
}
client := meta.NewClient(c)
client.SetLogOutput(ioutil.Discard)
if err := client.Open(); err != nil {
return err
}

View File

@ -11,6 +11,8 @@ import (
"runtime"
"strconv"
"time"
"github.com/uber-go/zap"
)
const logo = `
@ -38,6 +40,7 @@ type Command struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
Logger zap.Logger
Server *Server
}
@ -50,6 +53,7 @@ func NewCommand() *Command {
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
Logger: zap.New(zap.NullEncoder()),
}
}
@ -64,17 +68,13 @@ func (cmd *Command) Run(args ...string) error {
// Print sweet InfluxDB logo.
fmt.Print(logo)
// Configure default logging.
log.SetPrefix("[run] ")
log.SetFlags(log.LstdFlags)
// Set parallelism.
runtime.GOMAXPROCS(runtime.NumCPU())
// Mark start-up in log.
log.Printf("InfluxDB starting, version %s, branch %s, commit %s",
cmd.Version, cmd.Branch, cmd.Commit)
log.Printf("Go version %s, GOMAXPROCS set to %d", runtime.Version(), runtime.GOMAXPROCS(0))
cmd.Logger.Info(fmt.Sprintf("InfluxDB starting, version %s, branch %s, commit %s",
cmd.Version, cmd.Branch, cmd.Commit))
cmd.Logger.Info(fmt.Sprintf("Go version %s, GOMAXPROCS set to %d", runtime.Version(), runtime.GOMAXPROCS(0)))
// Write the PID file.
if err := cmd.writePIDFile(options.PIDFile); err != nil {
@ -113,6 +113,7 @@ func (cmd *Command) Run(args ...string) error {
if err != nil {
return fmt.Errorf("create server: %s", err)
}
s.Logger = cmd.Logger
s.CPUProfile = options.CPUProfile
s.MemProfile = options.MemProfile
if err := s.Open(); err != nil {
@ -192,11 +193,11 @@ func (cmd *Command) writePIDFile(path string) error {
func (cmd *Command) ParseConfig(path string) (*Config, error) {
// Use demo configuration if no config path is specified.
if path == "" {
log.Println("no configuration provided, using default settings")
cmd.Logger.Info("no configuration provided, using default settings")
return NewDemoConfig()
}
log.Printf("Using configuration at: %s\n", path)
cmd.Logger.Info(fmt.Sprintf("Using configuration at: %s\n", path))
config := NewConfig()
if err := config.FromTomlFile(path); err != nil {

View File

@ -31,6 +31,7 @@ import (
"github.com/influxdata/influxdb/tcp"
"github.com/influxdata/influxdb/tsdb"
client "github.com/influxdata/usage-client/v1"
"github.com/uber-go/zap"
// Initialize the engine packages
_ "github.com/influxdata/influxdb/tsdb/engine"
)
@ -61,7 +62,7 @@ type Server struct {
BindAddress string
Listener net.Listener
Logger *log.Logger
Logger zap.Logger
MetaClient *meta.Client
@ -94,10 +95,6 @@ type Server struct {
tcpAddr string
config *Config
// logOutput is the writer to which all services should be configured to
// write logs to after appension.
logOutput io.Writer
}
// NewServer returns a new instance of Server built from a config.
@ -143,7 +140,10 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
BindAddress: bind,
Logger: log.New(os.Stderr, "", log.LstdFlags),
Logger: zap.New(
zap.NewTextEncoder(),
zap.Output(os.Stderr),
),
MetaClient: meta.NewClient(c.Meta),
@ -153,8 +153,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
httpUseTLS: c.HTTPD.HTTPSEnabled,
tcpAddr: bind,
config: c,
logOutput: os.Stderr,
config: c,
}
s.Monitor = monitor.New(s, c.Monitor)
@ -227,8 +226,7 @@ func (s *Server) appendSnapshotterService() {
// SetLogOutput sets the logger used for all messages. It must not be called
// after the Open method has been called.
func (s *Server) SetLogOutput(w io.Writer) {
s.Logger = log.New(os.Stderr, "", log.LstdFlags)
s.logOutput = w
s.Logger = zap.New(zap.NewTextEncoder(), zap.Output(zap.AddSync(w)))
}
func (s *Server) appendMonitorService() {
@ -396,21 +394,20 @@ func (s *Server) Open() error {
s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
// Configure logging for all services and clients.
w := s.logOutput
if s.config.Meta.LoggingEnabled {
s.MetaClient.SetLogOutput(w)
s.MetaClient.WithLogger(s.Logger)
}
s.TSDBStore.SetLogOutput(w)
s.TSDBStore.WithLogger(s.Logger)
if s.config.Data.QueryLogEnabled {
s.QueryExecutor.SetLogOutput(w)
s.QueryExecutor.WithLogger(s.Logger)
}
s.PointsWriter.SetLogOutput(w)
s.Subscriber.SetLogOutput(w)
s.PointsWriter.WithLogger(s.Logger)
s.Subscriber.WithLogger(s.Logger)
for _, svc := range s.Services {
svc.SetLogOutput(w)
svc.WithLogger(s.Logger)
}
s.SnapshotterService.SetLogOutput(w)
s.Monitor.SetLogOutput(w)
s.SnapshotterService.WithLogger(s.Logger)
s.Monitor.WithLogger(s.Logger)
// Open TSDB store.
if err := s.TSDBStore.Open(); err != nil {
@ -539,7 +536,7 @@ func (s *Server) reportServer() {
},
}
s.Logger.Printf("Sending usage statistics to usage.influxdata.com")
s.Logger.Info("Sending usage statistics to usage.influxdata.com")
go cl.Save(usage)
}
@ -561,7 +558,7 @@ func (s *Server) monitorErrorChan(ch <-chan error) {
// Service represents a service attached to the server.
type Service interface {
SetLogOutput(w io.Writer)
WithLogger(log zap.Logger)
Open() error
Close() error
}

View File

@ -2,9 +2,7 @@ package coordinator
import (
"errors"
"io"
"log"
"os"
"fmt"
"sort"
"sync"
"sync/atomic"
@ -14,6 +12,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/uber-go/zap"
)
// The statistics generated by the "write" mdoule
@ -46,7 +45,7 @@ type PointsWriter struct {
mu sync.RWMutex
closing chan struct{}
WriteTimeout time.Duration
Logger *log.Logger
Logger zap.Logger
Node *influxdb.Node
@ -97,7 +96,7 @@ func NewPointsWriter() *PointsWriter {
return &PointsWriter{
closing: make(chan struct{}),
WriteTimeout: DefaultWriteTimeout,
Logger: log.New(os.Stderr, "[write] ", log.LstdFlags),
Logger: zap.New(zap.NullEncoder()),
stats: &WriteStatistics{},
}
}
@ -149,10 +148,8 @@ func (w *PointsWriter) Close() error {
return nil
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (w *PointsWriter) SetLogOutput(lw io.Writer) {
w.Logger = log.New(lw, "[write] ", log.LstdFlags)
func (w *PointsWriter) WithLogger(log zap.Logger) {
w.Logger = log.With(zap.String("service", "write"))
}
// WriteStatistics keeps statistics related to the PointsWriter.
@ -359,7 +356,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
if err == tsdb.ErrShardNotFound {
err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true)
if err != nil {
w.Logger.Printf("write failed for shard %d: %v", shard.ID, err)
w.Logger.Info(fmt.Sprintf("write failed for shard %d: %v", shard.ID, err))
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
@ -367,7 +364,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
}
err = w.TSDBStore.WriteToShard(shard.ID, points)
if err != nil {
w.Logger.Printf("write failed for shard %d: %v", shard.ID, err)
w.Logger.Info(fmt.Sprintf("write failed for shard %d: %v", shard.ID, err))
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
}

View File

@ -4,7 +4,6 @@ import (
"bytes"
"errors"
"io"
"log"
"os"
"reflect"
"testing"
@ -17,6 +16,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/uber-go/zap"
)
const (
@ -200,7 +200,10 @@ func NewQueryExecutor() *QueryExecutor {
if testing.Verbose() {
out = io.MultiWriter(out, os.Stderr)
}
e.QueryExecutor.Logger = log.New(out, "[query] ", log.LstdFlags)
e.QueryExecutor.WithLogger(zap.New(
zap.NewTextEncoder(),
zap.Output(os.Stderr),
))
return e
}

View File

@ -3,15 +3,13 @@ package influxql
import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"runtime/debug"
"sync"
"sync/atomic"
"time"
"github.com/influxdata/influxdb/models"
"github.com/uber-go/zap"
)
var (
@ -97,7 +95,7 @@ type ExecutionContext struct {
Results chan *Result
// Hold the query executor's logger.
Log *log.Logger
Log zap.Logger
// A channel that is closed when the query is interrupted.
InterruptCh <-chan struct{}
@ -154,7 +152,7 @@ type QueryExecutor struct {
// Logger to use for all logging.
// Defaults to discarding all log output.
Logger *log.Logger
Logger zap.Logger
// expvar-based stats.
stats *QueryStatistics
@ -164,7 +162,7 @@ type QueryExecutor struct {
func NewQueryExecutor() *QueryExecutor {
return &QueryExecutor{
TaskManager: NewTaskManager(),
Logger: log.New(ioutil.Discard, "[query] ", log.LstdFlags),
Logger: zap.New(zap.NullEncoder()),
stats: &QueryStatistics{},
}
}
@ -198,8 +196,8 @@ func (e *QueryExecutor) Close() error {
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (e *QueryExecutor) SetLogOutput(w io.Writer) {
e.Logger = log.New(w, "[query] ", log.LstdFlags)
func (e *QueryExecutor) WithLogger(log zap.Logger) {
e.Logger = log.With(zap.String("service", "query"))
e.TaskManager.Logger = e.Logger
}
@ -307,7 +305,7 @@ LOOP:
// Log each normalized statement.
if !ctx.Quiet {
e.Logger.Println(stmt.String())
e.Logger.Info(stmt.String())
}
// Send any other statements to the underlying statement executor.
@ -361,7 +359,7 @@ LOOP:
func (e *QueryExecutor) recover(query *Query, results chan *Result) {
if err := recover(); err != nil {
e.Logger.Printf("%s [panic:%s] %s", query.String(), err, debug.Stack())
e.Logger.Error(fmt.Sprintf("%s [panic:%s] %s", query.String(), err, debug.Stack()))
results <- &Result{
StatementID: -1,
Err: fmt.Errorf("%s [panic:%s]", query.String(), err),

View File

@ -2,12 +2,11 @@ package influxql
import (
"fmt"
"io/ioutil"
"log"
"sync"
"time"
"github.com/influxdata/influxdb/models"
"github.com/uber-go/zap"
)
const (
@ -30,7 +29,7 @@ type TaskManager struct {
// Logger to use for all logging.
// Defaults to discarding all log output.
Logger *log.Logger
Logger zap.Logger
// Used for managing and tracking running queries.
queries map[uint64]*QueryTask
@ -43,7 +42,7 @@ type TaskManager struct {
func NewTaskManager() *TaskManager {
return &TaskManager{
QueryTimeout: DefaultQueryTimeout,
Logger: log.New(ioutil.Discard, "[query] ", log.LstdFlags),
Logger: zap.New(zap.NullEncoder()),
queries: make(map[uint64]*QueryTask),
nextID: 1,
}
@ -157,8 +156,8 @@ func (t *TaskManager) AttachQuery(q *Query, database string, interrupt <-chan st
select {
case <-timer.C:
t.Logger.Printf("Detected slow query: %s (qid: %d, database: %s, threshold: %s)",
query.query, qid, query.database, t.LogQueriesAfter)
t.Logger.Warn(fmt.Sprintf("Detected slow query: %s (qid: %d, database: %s, threshold: %s)",
query.query, qid, query.database, t.LogQueriesAfter))
case <-closing:
}
return nil

View File

@ -4,8 +4,6 @@ import (
"errors"
"expvar"
"fmt"
"io"
"log"
"os"
"runtime"
"sort"
@ -16,6 +14,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor/diagnostics"
"github.com/influxdata/influxdb/services/meta"
"github.com/uber-go/zap"
)
// Policy constants.
@ -58,7 +57,7 @@ type Monitor struct {
// Writer for pushing stats back into the database.
PointsWriter PointsWriter
Logger *log.Logger
Logger zap.Logger
}
// PointsWriter is a simplified interface for writing the points the monitor gathers
@ -76,7 +75,7 @@ func New(r Reporter, c Config) *Monitor {
storeDatabase: c.StoreDatabase,
storeInterval: time.Duration(c.StoreInterval),
storeRetentionPolicy: MonitorRetentionPolicy,
Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags),
Logger: zap.New(zap.NullEncoder()),
}
}
@ -90,11 +89,11 @@ func (m *Monitor) open() bool {
// for identification purpose.
func (m *Monitor) Open() error {
if m.open() {
m.Logger.Println("Monitor is already open")
m.Logger.Info("Monitor is already open")
return nil
}
m.Logger.Printf("Starting monitor system")
m.Logger.Info("Starting monitor system")
// Self-register various stats and diagnostics.
m.RegisterDiagnosticsClient("build", &build{
@ -124,11 +123,11 @@ func (m *Monitor) Open() error {
// Close closes the monitor system.
func (m *Monitor) Close() error {
if !m.open() {
m.Logger.Println("Monitor is already closed.")
m.Logger.Info("Monitor is already closed.")
return nil
}
m.Logger.Println("shutting down monitor system")
m.Logger.Info("shutting down monitor system")
m.mu.Lock()
close(m.done)
m.mu.Unlock()
@ -177,10 +176,8 @@ func (m *Monitor) SetPointsWriter(pw PointsWriter) error {
return m.Open()
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (m *Monitor) SetLogOutput(w io.Writer) {
m.Logger = log.New(w, "[monitor] ", log.LstdFlags)
func (m *Monitor) WithLogger(log zap.Logger) {
m.Logger = log.With(zap.String("service", "monitor"))
}
// RegisterDiagnosticsClient registers a diagnostics client with the given name and tags.
@ -188,7 +185,7 @@ func (m *Monitor) RegisterDiagnosticsClient(name string, client diagnostics.Clie
m.mu.Lock()
defer m.mu.Unlock()
m.diagRegistrations[name] = client
m.Logger.Printf(`'%s' registered for diagnostics monitoring`, name)
m.Logger.Info(fmt.Sprintf(`'%s' registered for diagnostics monitoring`, name))
}
// DeregisterDiagnosticsClient deregisters a diagnostics client by name.
@ -352,8 +349,8 @@ func (m *Monitor) createInternalStorage() {
}
if _, err := m.MetaClient.CreateDatabaseWithRetentionPolicy(m.storeDatabase, &spec); err != nil {
m.Logger.Printf("failed to create database '%s', failed to create storage: %s",
m.storeDatabase, err.Error())
m.Logger.Info(fmt.Sprintf("failed to create database '%s', failed to create storage: %s",
m.storeDatabase, err.Error()))
return
}
}
@ -380,8 +377,8 @@ func (m *Monitor) waitUntilInterval(d time.Duration) error {
// storeStatistics writes the statistics to an InfluxDB system.
func (m *Monitor) storeStatistics() {
defer m.wg.Done()
m.Logger.Printf("Storing statistics in database '%s' retention policy '%s', at interval %s",
m.storeDatabase, m.storeRetentionPolicy, m.storeInterval)
m.Logger.Info(fmt.Sprintf("Storing statistics in database '%s' retention policy '%s', at interval %s",
m.storeDatabase, m.storeRetentionPolicy, m.storeInterval))
hostname, _ := os.Hostname()
m.SetGlobalTag("hostname", hostname)
@ -407,7 +404,7 @@ func (m *Monitor) storeStatistics() {
stats, err := m.Statistics(m.globalTags)
if err != nil {
m.Logger.Printf("failed to retrieve registered statistics: %s", err)
m.Logger.Info(fmt.Sprintf("failed to retrieve registered statistics: %s", err))
return
}
@ -415,7 +412,7 @@ func (m *Monitor) storeStatistics() {
for _, s := range stats {
pt, err := models.NewPoint(s.Name, models.NewTags(s.Tags), s.Values, now)
if err != nil {
m.Logger.Printf("Dropping point %v: %v", s.Name, err)
m.Logger.Info(fmt.Sprintf("Dropping point %v: %v", s.Name, err))
return
}
points = append(points, pt)
@ -426,11 +423,11 @@ func (m *Monitor) storeStatistics() {
defer m.mu.RUnlock()
if err := m.PointsWriter.WritePoints(m.storeDatabase, m.storeRetentionPolicy, points); err != nil {
m.Logger.Printf("failed to store statistics: %s", err)
m.Logger.Info(fmt.Sprintf("failed to store statistics: %s", err))
}
}()
case <-m.done:
m.Logger.Printf("terminating storage of statistics")
m.Logger.Info(fmt.Sprintf("terminating storage of statistics"))
return
}
}

View File

@ -3,16 +3,14 @@ package admin // import "github.com/influxdata/influxdb/services/admin"
import (
"crypto/tls"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"strings"
// Register static assets via statik.
_ "github.com/influxdata/influxdb/services/admin/statik"
"github.com/rakyll/statik/fs"
"github.com/uber-go/zap"
)
// Service manages the listener for an admin endpoint.
@ -24,7 +22,7 @@ type Service struct {
err chan error
version string
logger *log.Logger
logger zap.Logger
}
// NewService returns a new instance of Service.
@ -35,14 +33,14 @@ func NewService(c Config) *Service {
cert: c.HTTPSCertificate,
err: make(chan error),
version: c.Version,
logger: log.New(os.Stderr, "[admin] ", log.LstdFlags),
logger: zap.New(zap.NullEncoder()),
}
}
// Open starts the service
func (s *Service) Open() error {
s.logger.Printf("Starting admin service")
s.logger.Println("DEPRECATED: This plugin is deprecated as of 1.1.0 and will be removed in a future release")
s.logger.Info("Starting admin service")
s.logger.Info("DEPRECATED: This plugin is deprecated as of 1.1.0 and will be removed in a future release")
// Open listener.
if s.https {
@ -58,7 +56,7 @@ func (s *Service) Open() error {
return err
}
s.logger.Println("Listening on HTTPS:", listener.Addr().String())
s.logger.Info(fmt.Sprint("Listening on HTTPS: ", listener.Addr().String()))
s.listener = listener
} else {
listener, err := net.Listen("tcp", s.addr)
@ -66,7 +64,7 @@ func (s *Service) Open() error {
return err
}
s.logger.Println("Listening on HTTP:", listener.Addr().String())
s.logger.Info(fmt.Sprint("Listening on HTTP: ", listener.Addr().String()))
s.listener = listener
}
@ -83,10 +81,8 @@ func (s *Service) Close() error {
return nil
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
s.logger = log.New(w, "[admin] ", log.LstdFlags)
func (s *Service) WithLogger(log zap.Logger) {
s.logger = log.With(zap.String("service", "admin"))
}
// Err returns a channel for fatal errors that occur on the listener.

View File

@ -2,9 +2,7 @@ package collectd // import "github.com/influxdata/influxdb/services/collectd"
import (
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
@ -18,6 +16,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/uber-go/zap"
)
// statistics gathered by the collectd service.
@ -58,7 +57,7 @@ type Service struct {
Config *Config
MetaClient metaClient
PointsWriter pointsWriter
Logger *log.Logger
Logger zap.Logger
wg sync.WaitGroup
conn *net.UDPConn
@ -81,7 +80,7 @@ func NewService(c Config) *Service {
// Use defaults where necessary.
Config: c.WithDefaults(),
Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags),
Logger: zap.New(zap.NullEncoder()),
stats: &Statistics{},
defaultTags: models.StatisticTags{"bind": c.BindAddress},
}
@ -99,7 +98,7 @@ func (s *Service) Open() error {
}
s.done = make(chan struct{})
s.Logger.Printf("Starting collectd service")
s.Logger.Info("Starting collectd service")
if s.Config.BindAddress == "" {
return fmt.Errorf("bind address is blank")
@ -119,7 +118,7 @@ func (s *Service) Open() error {
readdir = func(path string) {
files, err := ioutil.ReadDir(path)
if err != nil {
s.Logger.Printf("Unable to read directory %s: %s\n", path, err)
s.Logger.Info(fmt.Sprintf("Unable to read directory %s: %s\n", path, err))
return
}
@ -130,10 +129,10 @@ func (s *Service) Open() error {
continue
}
s.Logger.Printf("Loading %s\n", fullpath)
s.Logger.Info(fmt.Sprintf("Loading %s\n", fullpath))
types, err := TypesDBFile(fullpath)
if err != nil {
s.Logger.Printf("Unable to parse collectd types file: %s\n", f.Name())
s.Logger.Info(fmt.Sprintf("Unable to parse collectd types file: %s\n", f.Name()))
continue
}
@ -143,7 +142,7 @@ func (s *Service) Open() error {
readdir(s.Config.TypesDB)
s.popts.TypesDB = alltypesdb
} else {
s.Logger.Printf("Loading %s\n", s.Config.TypesDB)
s.Logger.Info(fmt.Sprintf("Loading %s\n", s.Config.TypesDB))
types, err := TypesDBFile(s.Config.TypesDB)
if err != nil {
return fmt.Errorf("Open(): %s", err)
@ -190,7 +189,7 @@ func (s *Service) Open() error {
}
s.conn = conn
s.Logger.Println("Listening on UDP: ", conn.LocalAddr().String())
s.Logger.Info(fmt.Sprint("Listening on UDP: ", conn.LocalAddr().String()))
// Start the points batcher.
s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchPending, time.Duration(s.Config.BatchDuration))
@ -227,7 +226,7 @@ func (s *Service) Close() error {
// Release all remaining resources.
s.conn = nil
s.batcher = nil
s.Logger.Println("collectd UDP closed")
s.Logger.Info("collectd UDP closed")
s.done = nil
return nil
}
@ -262,10 +261,8 @@ func (s *Service) createInternalStorage() error {
return nil
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
s.Logger = log.New(w, "[collectd] ", log.LstdFlags)
func (s *Service) WithLogger(log zap.Logger) {
s.Logger = log.With(zap.String("service", "collectd"))
}
// Statistics maintains statistics for the collectd service.
@ -333,7 +330,7 @@ func (s *Service) serve() {
n, _, err := s.conn.ReadFromUDP(buffer)
if err != nil {
atomic.AddInt64(&s.stats.ReadFail, 1)
s.Logger.Printf("collectd ReadFromUDP error: %s", err)
s.Logger.Info(fmt.Sprintf("collectd ReadFromUDP error: %s", err))
continue
}
if n > 0 {
@ -347,7 +344,7 @@ func (s *Service) handleMessage(buffer []byte) {
valueLists, err := network.Parse(buffer, s.popts)
if err != nil {
atomic.AddInt64(&s.stats.PointsParseFail, 1)
s.Logger.Printf("Collectd parse error: %s", err)
s.Logger.Info(fmt.Sprintf("Collectd parse error: %s", err))
return
}
for _, valueList := range valueLists {
@ -367,7 +364,7 @@ func (s *Service) writePoints() {
case batch := <-s.batcher.Out():
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.Logger.Printf("Required database %s not yet created: %s", s.Config.Database, err.Error())
s.Logger.Info(fmt.Sprintf("Required database %s not yet created: %s", s.Config.Database, err.Error()))
continue
}
@ -375,7 +372,7 @@ func (s *Service) writePoints() {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {
s.Logger.Printf("failed to write point batch to database %q: %s", s.Config.Database, err)
s.Logger.Info(fmt.Sprintf("failed to write point batch to database %q: %s", s.Config.Database, err))
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
}
}
@ -419,7 +416,7 @@ func (s *Service) UnmarshalValueList(vl *api.ValueList) []models.Point {
// Drop invalid points
p, err := models.NewPoint(name, models.NewTags(tags), fields, timestamp)
if err != nil {
s.Logger.Printf("Dropping point %v: %v", name, err)
s.Logger.Info(fmt.Sprintf("Dropping point %v: %v", name, err))
atomic.AddInt64(&s.stats.InvalidDroppedPoints, 1)
continue
}

View File

@ -3,9 +3,8 @@ package collectd
import (
"encoding/hex"
"errors"
"io/ioutil"
"log"
"net"
"os"
"strings"
"testing"
"time"
@ -14,6 +13,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/toml"
"github.com/uber-go/zap"
)
func TestService_OpenClose(t *testing.T) {
@ -299,8 +299,11 @@ func NewTestService(batchSize int, batchDuration time.Duration) *TestService {
panic(err)
}
if !testing.Verbose() {
s.Service.Logger = log.New(ioutil.Discard, "", log.LstdFlags)
if testing.Verbose() {
s.Service.WithLogger(zap.New(
zap.NewTextEncoder(),
zap.Output(os.Stderr),
))
}
return s

View File

@ -3,9 +3,6 @@ package continuous_querier // import "github.com/influxdata/influxdb/services/co
import (
"errors"
"fmt"
"io"
"log"
"os"
"strings"
"sync"
"sync/atomic"
@ -14,6 +11,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/uber-go/zap"
)
const (
@ -76,7 +74,7 @@ type Service struct {
RunInterval time.Duration
// RunCh can be used by clients to signal service to run CQs.
RunCh chan *RunRequest
Logger *log.Logger
Logger zap.Logger
loggingEnabled bool
stats *Statistics
// lastRuns maps CQ name to last time it was run.
@ -93,7 +91,7 @@ func NewService(c Config) *Service {
RunInterval: time.Duration(c.RunInterval),
RunCh: make(chan *RunRequest),
loggingEnabled: c.LogEnabled,
Logger: log.New(os.Stderr, "[continuous_querier] ", log.LstdFlags),
Logger: zap.New(zap.NullEncoder()),
stats: &Statistics{},
lastRuns: map[string]time.Time{},
}
@ -103,7 +101,7 @@ func NewService(c Config) *Service {
// Open starts the service.
func (s *Service) Open() error {
s.Logger.Println("Starting continuous query service")
s.Logger.Info("Starting continuous query service")
if s.stop != nil {
return nil
@ -131,10 +129,8 @@ func (s *Service) Close() error {
return nil
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
s.Logger = log.New(w, "[continuous_querier] ", log.LstdFlags)
func (s *Service) WithLogger(log zap.Logger) {
s.Logger = log.With(zap.String("service", "continuous_querier"))
}
// Statistics maintains the statistics for the continuous query service.
@ -202,14 +198,14 @@ func (s *Service) backgroundLoop() {
for {
select {
case <-s.stop:
s.Logger.Println("continuous query service terminating")
s.Logger.Info("continuous query service terminating")
return
case req := <-s.RunCh:
if !s.hasContinuousQueries() {
continue
}
if _, err := s.MetaClient.AcquireLease(leaseName); err == nil {
s.Logger.Printf("running continuous queries by request for time: %v", req.Now)
s.Logger.Info(fmt.Sprintf("running continuous queries by request for time: %v", req.Now))
s.runContinuousQueries(req)
}
case <-t.C:
@ -250,7 +246,7 @@ func (s *Service) runContinuousQueries(req *RunRequest) {
continue
}
if err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil {
s.Logger.Printf("error executing query: %s: err = %s", cq.Query, err)
s.Logger.Info(fmt.Sprintf("error executing query: %s: err = %s", cq.Query, err))
atomic.AddInt64(&s.stats.QueryFail, 1)
} else {
atomic.AddInt64(&s.stats.QueryOK, 1)
@ -339,24 +335,24 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
}
if err := cq.q.SetTimeRange(startTime, endTime); err != nil {
s.Logger.Printf("error setting time range: %s\n", err)
s.Logger.Info(fmt.Sprintf("error setting time range: %s\n", err))
return err
}
var start time.Time
if s.loggingEnabled {
s.Logger.Printf("executing continuous query %s (%v to %v)", cq.Info.Name, startTime, endTime)
s.Logger.Info(fmt.Sprintf("executing continuous query %s (%v to %v)", cq.Info.Name, startTime, endTime))
start = time.Now()
}
// Do the actual processing of the query & writing of results.
if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
s.Logger.Printf("error: %s. running: %s\n", err, cq.q.String())
s.Logger.Info(fmt.Sprintf("error: %s. running: %s\n", err, cq.q.String()))
return err
}
if s.loggingEnabled {
s.Logger.Printf("finished continuous query %s (%v to %v) in %s", cq.Info.Name, startTime, endTime, time.Now().Sub(start))
s.Logger.Info(fmt.Sprintf("finished continuous query %s (%v to %v) in %s", cq.Info.Name, startTime, endTime, time.Now().Sub(start)))
}
return nil
}

View File

@ -3,8 +3,7 @@ package continuous_querier
import (
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"sync"
"testing"
"time"
@ -13,6 +12,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/uber-go/zap"
)
var (
@ -391,8 +391,11 @@ func NewTestService(t *testing.T) *Service {
s.RunInterval = time.Millisecond
// Set Logger to write to dev/null so stdout isn't polluted.
if !testing.Verbose() {
s.Logger = log.New(ioutil.Discard, "", log.LstdFlags)
if testing.Verbose() {
s.WithLogger(zap.New(
zap.NewTextEncoder(),
zap.Output(os.Stderr),
))
}
// Add a couple test databases and CQs.

View File

@ -3,11 +3,8 @@ package graphite // import "github.com/influxdata/influxdb/services/graphite"
import (
"bufio"
"fmt"
"io"
"log"
"math"
"net"
"os"
"strings"
"sync"
"sync/atomic"
@ -17,6 +14,7 @@ import (
"github.com/influxdata/influxdb/monitor/diagnostics"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/uber-go/zap"
)
const udpBufferSize = 65536
@ -57,7 +55,7 @@ type Service struct {
batcher *tsdb.PointBatcher
parser *Parser
logger *log.Logger
logger zap.Logger
stats *Statistics
defaultTags models.StatisticTags
@ -105,7 +103,7 @@ func NewService(c Config) (*Service, error) {
batchPending: d.BatchPending,
udpReadBuffer: d.UDPReadBuffer,
batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, fmt.Sprintf("[graphite] %s ", d.BindAddress), log.LstdFlags),
logger: zap.New(zap.NullEncoder()),
stats: &Statistics{},
defaultTags: models.StatisticTags{"proto": d.Protocol, "bind": d.BindAddress},
tcpConnections: make(map[string]*tcpConnection),
@ -135,7 +133,7 @@ func (s *Service) Open() error {
}
s.done = make(chan struct{})
s.logger.Printf("Starting graphite service, batch size %d, batch timeout %s", s.batchSize, s.batchTimeout)
s.logger.Info(fmt.Sprintf("Starting graphite service, batch size %d, batch timeout %s", s.batchSize, s.batchTimeout))
// Register diagnostics if a Monitor service is available.
if s.Monitor != nil {
@ -161,7 +159,7 @@ func (s *Service) Open() error {
return err
}
s.logger.Printf("Listening on %s: %s", strings.ToUpper(s.protocol), s.addr.String())
s.logger.Info(fmt.Sprintf("Listening on %s: %s", strings.ToUpper(s.protocol), s.addr.String()))
return nil
}
func (s *Service) closeAllConnections() {
@ -252,10 +250,11 @@ func (s *Service) createInternalStorage() error {
return nil
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
s.logger = log.New(w, "[graphite] ", log.LstdFlags)
func (s *Service) WithLogger(log zap.Logger) {
s.logger = log.With(
zap.String("service", "graphite"),
zap.String("addr", s.bindAddress),
)
}
// Statistics maintains statistics for the graphite service.
@ -309,11 +308,11 @@ func (s *Service) openTCPServer() (net.Addr, error) {
for {
conn, err := s.ln.Accept()
if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
s.logger.Println("graphite TCP listener closed")
s.logger.Info("graphite TCP listener closed")
return
}
if err != nil {
s.logger.Println("error accepting TCP connection", err.Error())
s.logger.Info("error accepting TCP connection", zap.Error(err))
continue
}
@ -424,7 +423,7 @@ func (s *Service) handleLine(line string) {
return
}
}
s.logger.Printf("unable to parse line: %s: %s", line, err)
s.logger.Info(fmt.Sprintf("unable to parse line: %s: %s", line, err))
atomic.AddInt64(&s.stats.PointsParseFail, 1)
return
}
@ -440,7 +439,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
case batch := <-batcher.Out():
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.logger.Printf("Required database or retention policy do not yet exist: %s", err.Error())
s.logger.Info(fmt.Sprintf("Required database or retention policy do not yet exist: %s", err.Error()))
continue
}
@ -448,7 +447,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {
s.logger.Printf("failed to write point batch to database %q: %s", s.database, err)
s.logger.Info(fmt.Sprintf("failed to write point batch to database %q: %s", s.database, err))
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
}

View File

@ -3,8 +3,8 @@ package graphite
import (
"errors"
"fmt"
"io/ioutil"
"net"
"os"
"sync"
"testing"
"time"
@ -13,6 +13,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/toml"
"github.com/uber-go/zap"
)
func Test_Service_OpenClose(t *testing.T) {
@ -287,8 +288,11 @@ func NewTestService(c *Config) *TestService {
return nil, nil
}
if !testing.Verbose() {
service.Service.SetLogOutput(ioutil.Discard)
if testing.Verbose() {
service.Service.WithLogger(zap.New(
zap.NewTextEncoder(),
zap.Output(os.Stderr),
))
}
// Set the Meta Client and PointsWriter.

View File

@ -27,6 +27,7 @@ import (
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/uuid"
"github.com/uber-go/zap"
)
const (
@ -89,7 +90,7 @@ type Handler struct {
}
Config *Config
Logger *log.Logger
Logger zap.Logger
CLFLogger *log.Logger
stats *Statistics
}
@ -99,7 +100,7 @@ func NewHandler(c Config) *Handler {
h := &Handler{
mux: pat.New(),
Config: &c,
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
Logger: zap.New(zap.NullEncoder()),
CLFLogger: log.New(os.Stderr, "[httpd] ", 0),
stats: &Statistics{},
}
@ -358,7 +359,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
if h.Config.AuthEnabled {
if err := h.QueryAuthorizer.AuthorizeQuery(user, query, db); err != nil {
if err, ok := err.(meta.ErrAuthorize); ok {
h.Logger.Printf("Unauthorized request | user: %q | query: %q | database %q\n", err.User, err.Query.String(), err.Database)
h.Logger.Info(fmt.Sprintf("Unauthorized request | user: %q | query: %q | database %q", err.User, err.Query.String(), err.Database))
}
h.httpError(rw, "error authorizing query: "+err.Error(), http.StatusForbidden)
return
@ -557,7 +558,7 @@ func (h *Handler) async(query *influxql.Query, results <-chan *influxql.Result)
if r.Err == influxql.ErrNotExecuted {
continue
}
h.Logger.Printf("error while running async query: %s: %s", query, r.Err)
h.Logger.Info(fmt.Sprintf("error while running async query: %s: %s", query, r.Err))
}
}
}
@ -619,7 +620,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
_, err := buf.ReadFrom(body)
if err != nil {
if h.Config.WriteTracing {
h.Logger.Print("Write handler unable to read bytes from request body")
h.Logger.Info("Write handler unable to read bytes from request body")
}
h.httpError(w, err.Error(), http.StatusBadRequest)
return
@ -627,7 +628,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len()))
if h.Config.WriteTracing {
h.Logger.Printf("Write body received by handler: %s", buf.Bytes())
h.Logger.Info(fmt.Sprintf("Write body received by handler: %s", buf.Bytes()))
}
points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision"))
@ -693,7 +694,7 @@ func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
// serveStatus has been deprecated
func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) {
h.Logger.Printf("WARNING: /status has been deprecated. Use /ping instead.")
h.Logger.Info("WARNING: /status has been deprecated. Use /ping instead.")
atomic.AddInt64(&h.stats.StatusRequests, 1)
h.writeHeader(w, http.StatusNoContent)
}
@ -939,7 +940,7 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo)
claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
h.httpError(w, "problem authenticating token", http.StatusInternalServerError)
h.Logger.Print("Could not assert JWT token claims as jwt.MapClaims")
h.Logger.Info("Could not assert JWT token claims as jwt.MapClaims")
return
}
@ -1094,7 +1095,7 @@ func (h *Handler) recovery(inner http.Handler, name string) http.Handler {
if err := recover(); err != nil {
logLine := buildLogLine(l, r, start)
logLine = fmt.Sprintf("%s [panic:%s] %s", logLine, err, debug.Stack())
h.Logger.Println(logLine)
h.CLFLogger.Println(logLine)
}
}()

View File

@ -3,8 +3,6 @@ package httpd // import "github.com/influxdata/influxdb/services/httpd"
import (
"crypto/tls"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
@ -15,6 +13,7 @@ import (
"time"
"github.com/influxdata/influxdb/models"
"github.com/uber-go/zap"
)
// statistics gathered by the httpd package.
@ -55,7 +54,7 @@ type Service struct {
Handler *Handler
Logger *log.Logger
Logger zap.Logger
}
// NewService returns a new instance of Service.
@ -70,7 +69,7 @@ func NewService(c Config) *Service {
unixSocket: c.UnixSocketEnabled,
bindSocket: c.BindSocket,
Handler: NewHandler(c),
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
Logger: zap.New(zap.NullEncoder()),
}
if s.key == "" {
s.key = s.cert
@ -81,8 +80,8 @@ func NewService(c Config) *Service {
// Open starts the service
func (s *Service) Open() error {
s.Logger.Println("Starting HTTP service")
s.Logger.Println("Authentication enabled:", s.Handler.Config.AuthEnabled)
s.Logger.Info("Starting HTTP service")
s.Logger.Info(fmt.Sprint("Authentication enabled:", s.Handler.Config.AuthEnabled))
// Open listener.
if s.https {
@ -98,7 +97,7 @@ func (s *Service) Open() error {
return err
}
s.Logger.Println("Listening on HTTPS:", listener.Addr().String())
s.Logger.Info(fmt.Sprint("Listening on HTTPS:", listener.Addr().String()))
s.ln = listener
} else {
listener, err := net.Listen("tcp", s.addr)
@ -106,7 +105,7 @@ func (s *Service) Open() error {
return err
}
s.Logger.Println("Listening on HTTP:", listener.Addr().String())
s.Logger.Info(fmt.Sprint("Listening on HTTP:", listener.Addr().String()))
s.ln = listener
}
@ -127,7 +126,7 @@ func (s *Service) Open() error {
return err
}
s.Logger.Println("Listening on unix socket:", listener.Addr().String())
s.Logger.Info(fmt.Sprint("Listening on unix socket:", listener.Addr().String()))
s.unixSocketListener = listener
go s.serveUnixSocket()
@ -173,10 +172,9 @@ func (s *Service) Close() error {
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
l := log.New(w, "[httpd] ", log.LstdFlags)
s.Logger = l
s.Handler.Logger = l
func (s *Service) WithLogger(log zap.Logger) {
s.Logger = log.With(zap.String("service", "httpd"))
s.Handler.Logger = s.Logger
}
// Err returns a channel for fatal errors that occur on the listener.

View File

@ -8,7 +8,6 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"os"
@ -19,6 +18,7 @@ import (
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/uber-go/zap"
"golang.org/x/crypto/bcrypt"
)
@ -45,7 +45,7 @@ var (
// Client is used to execute commands on and read data from
// a meta service cluster.
type Client struct {
logger *log.Logger
logger zap.Logger
mu sync.RWMutex
closing chan struct{}
@ -75,7 +75,7 @@ func NewClient(config *Config) *Client {
},
closing: make(chan struct{}),
changed: make(chan struct{}),
logger: log.New(ioutil.Discard, "[metaclient] ", log.LstdFlags),
logger: zap.New(zap.NullEncoder()),
authCache: make(map[string]authUser, 0),
path: config.Dir,
retentionAutoCreate: config.RetentionAutoCreate,
@ -780,16 +780,16 @@ func (c *Client) PrecreateShardGroups(from, to time.Time) error {
nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)
// if it already exists, continue
if sg, _ := data.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); sg != nil {
c.logger.Printf("shard group %d exists for database %s, retention policy %s", sg.ID, di.Name, rp.Name)
c.logger.Info(fmt.Sprintf("shard group %d exists for database %s, retention policy %s", sg.ID, di.Name, rp.Name))
continue
}
newGroup, err := createShardGroup(data, di.Name, rp.Name, nextShardGroupTime)
if err != nil {
c.logger.Printf("failed to precreate successive shard group for group %d: %s", g.ID, err.Error())
c.logger.Info(fmt.Sprintf("failed to precreate successive shard group for group %d: %s", g.ID, err.Error()))
continue
}
changed = true
c.logger.Printf("new shard group %d successfully precreated for database %s, retention policy %s", newGroup.ID, di.Name, rp.Name)
c.logger.Info(fmt.Sprintf("new shard group %d successfully precreated for database %s, retention policy %s", newGroup.ID, di.Name, rp.Name))
}
}
}
@ -956,12 +956,10 @@ func (c *Client) MarshalBinary() ([]byte, error) {
return c.cacheData.MarshalBinary()
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (c *Client) SetLogOutput(w io.Writer) {
func (c *Client) WithLogger(log zap.Logger) {
c.mu.Lock()
defer c.mu.Unlock()
c.logger = log.New(w, "[metaclient] ", log.LstdFlags)
c.logger = log.With(zap.String("service", "metaclient"))
}
func (c *Client) updateAuthCache() {

View File

@ -5,8 +5,8 @@ import (
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"sync"
@ -15,6 +15,7 @@ import (
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
"github.com/uber-go/zap"
)
// Handler is an http.Handler for the service.
@ -26,7 +27,7 @@ type Handler struct {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}
Logger *log.Logger
Logger zap.Logger
stats *Statistics
}
@ -114,7 +115,7 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) {
pt, err := models.NewPoint(p.Metric, models.NewTags(p.Tags), map[string]interface{}{"value": p.Value}, ts)
if err != nil {
h.Logger.Printf("Dropping point %v: %v", p.Metric, err)
h.Logger.Info(fmt.Sprintf("Dropping point %v: %v", p.Metric, err))
if h.stats != nil {
atomic.AddInt64(&h.stats.InvalidDroppedPoints, 1)
}
@ -125,11 +126,11 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) {
// Write points.
if err := h.PointsWriter.WritePoints(h.Database, h.RetentionPolicy, models.ConsistencyLevelAny, points); influxdb.IsClientError(err) {
h.Logger.Println("write series error: ", err)
h.Logger.Info(fmt.Sprint("write series error: ", err))
http.Error(w, "write series error: "+err.Error(), http.StatusBadRequest)
return
} else if err != nil {
h.Logger.Println("write series error: ", err)
h.Logger.Info(fmt.Sprint("write series error: ", err))
http.Error(w, "write series error: "+err.Error(), http.StatusInternalServerError)
return
}

View File

@ -4,12 +4,11 @@ import (
"bufio"
"bytes"
"crypto/tls"
"fmt"
"io"
"log"
"net"
"net/http"
"net/textproto"
"os"
"strconv"
"strings"
"sync"
@ -19,6 +18,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/uber-go/zap"
)
// statistics gathered by the openTSDB package.
@ -72,7 +72,7 @@ type Service struct {
batcher *tsdb.PointBatcher
LogPointErrors bool
Logger *log.Logger
Logger zap.Logger
stats *Statistics
defaultTags models.StatisticTags
@ -92,7 +92,7 @@ func NewService(c Config) (*Service, error) {
batchSize: d.BatchSize,
batchPending: d.BatchPending,
batchTimeout: time.Duration(d.BatchTimeout),
Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags),
Logger: zap.New(zap.NullEncoder()),
LogPointErrors: d.LogPointErrors,
stats: &Statistics{},
defaultTags: models.StatisticTags{"bind": d.BindAddress},
@ -110,7 +110,7 @@ func (s *Service) Open() error {
}
s.done = make(chan struct{})
s.Logger.Println("Starting OpenTSDB service")
s.Logger.Info("Starting OpenTSDB service")
s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout)
s.batcher.Start()
@ -133,7 +133,7 @@ func (s *Service) Open() error {
return err
}
s.Logger.Println("Listening on TLS:", listener.Addr().String())
s.Logger.Info(fmt.Sprint("Listening on TLS: ", listener.Addr().String()))
s.ln = listener
} else {
listener, err := net.Listen("tcp", s.BindAddress)
@ -141,7 +141,7 @@ func (s *Service) Open() error {
return err
}
s.Logger.Println("Listening on:", listener.Addr().String())
s.Logger.Info(fmt.Sprint("Listening on: ", listener.Addr().String()))
s.ln = listener
}
s.httpln = newChanListener(s.ln.Addr())
@ -219,10 +219,8 @@ func (s *Service) createInternalStorage() error {
return nil
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
s.Logger = log.New(w, "[opentsdb] ", log.LstdFlags)
func (s *Service) WithLogger(log zap.Logger) {
s.Logger = log.With(zap.String("service", "opentsdb"))
}
// Statistics maintains statistics for the subscriber service.
@ -288,10 +286,10 @@ func (s *Service) serve() {
// Wait for next connection.
conn, err := s.ln.Accept()
if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
s.Logger.Println("openTSDB TCP listener closed")
s.Logger.Info("openTSDB TCP listener closed")
return
} else if err != nil {
s.Logger.Println("error accepting openTSDB: ", err.Error())
s.Logger.Info(fmt.Sprint("error accepting openTSDB: ", err.Error()))
continue
}
@ -349,7 +347,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
if err != nil {
if err != io.EOF {
atomic.AddInt64(&s.stats.TelnetReadError, 1)
s.Logger.Println("error reading from openTSDB connection", err.Error())
s.Logger.Info(fmt.Sprint("error reading from openTSDB connection ", err.Error()))
}
return
}
@ -366,7 +364,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
if len(inputStrs) < 4 || inputStrs[0] != "put" {
atomic.AddInt64(&s.stats.TelnetBadLine, 1)
if s.LogPointErrors {
s.Logger.Printf("malformed line '%s' from %s", line, remoteAddr)
s.Logger.Info(fmt.Sprintf("malformed line '%s' from %s", line, remoteAddr))
}
continue
}
@ -381,7 +379,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
if err != nil {
atomic.AddInt64(&s.stats.TelnetBadTime, 1)
if s.LogPointErrors {
s.Logger.Printf("malformed time '%s' from %s", tsStr, remoteAddr)
s.Logger.Info(fmt.Sprintf("malformed time '%s' from %s", tsStr, remoteAddr))
}
}
@ -395,7 +393,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
default:
atomic.AddInt64(&s.stats.TelnetBadTime, 1)
if s.LogPointErrors {
s.Logger.Printf("bad time '%s' must be 10 or 13 chars, from %s ", tsStr, remoteAddr)
s.Logger.Info(fmt.Sprintf("bad time '%s' must be 10 or 13 chars, from %s ", tsStr, remoteAddr))
}
continue
}
@ -406,7 +404,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
atomic.AddInt64(&s.stats.TelnetBadTag, 1)
if s.LogPointErrors {
s.Logger.Printf("malformed tag data '%v' from %s", tagStrs[t], remoteAddr)
s.Logger.Info(fmt.Sprintf("malformed tag data '%v' from %s", tagStrs[t], remoteAddr))
}
continue
}
@ -420,7 +418,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
if err != nil {
atomic.AddInt64(&s.stats.TelnetBadFloat, 1)
if s.LogPointErrors {
s.Logger.Printf("bad float '%s' from %s", valueStr, remoteAddr)
s.Logger.Info(fmt.Sprintf("bad float '%s' from %s", valueStr, remoteAddr))
}
continue
}
@ -430,7 +428,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
if err != nil {
atomic.AddInt64(&s.stats.TelnetBadFloat, 1)
if s.LogPointErrors {
s.Logger.Printf("bad float '%s' from %s", valueStr, remoteAddr)
s.Logger.Info(fmt.Sprintf("bad float '%s' from %s", valueStr, remoteAddr))
}
continue
}
@ -460,7 +458,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
case batch := <-batcher.Out():
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.Logger.Printf("Required database %s does not yet exist: %s", s.Database, err.Error())
s.Logger.Info(fmt.Sprintf("Required database %s does not yet exist: %s", s.Database, err.Error()))
continue
}
@ -468,7 +466,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {
s.Logger.Printf("failed to write point batch to database %q: %s", s.Database, err)
s.Logger.Info(fmt.Sprintf("failed to write point batch to database %q: %s", s.Database, err))
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
}
}

View File

@ -3,10 +3,9 @@ package opentsdb
import (
"errors"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"reflect"
"strings"
"sync/atomic"
@ -17,6 +16,7 @@ import (
"github.com/influxdata/influxdb/internal"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/uber-go/zap"
)
func Test_Service_OpenClose(t *testing.T) {
@ -276,8 +276,11 @@ func NewTestService(database string, bind string) *TestService {
return nil, nil
}
if !testing.Verbose() {
service.Service.Logger = log.New(ioutil.Discard, "", log.LstdFlags)
if testing.Verbose() {
service.Service.WithLogger(zap.New(
zap.NewTextEncoder(),
zap.Output(os.Stderr),
))
}
service.Service.MetaClient = service.MetaClient

View File

@ -1,11 +1,11 @@
package precreator // import "github.com/influxdata/influxdb/services/precreator"
import (
"io"
"log"
"os"
"fmt"
"sync"
"time"
"github.com/uber-go/zap"
)
// Service manages the shard precreation service.
@ -13,7 +13,7 @@ type Service struct {
checkInterval time.Duration
advancePeriod time.Duration
Logger *log.Logger
Logger zap.Logger
done chan struct{}
wg sync.WaitGroup
@ -28,16 +28,14 @@ func NewService(c Config) (*Service, error) {
s := Service{
checkInterval: time.Duration(c.CheckInterval),
advancePeriod: time.Duration(c.AdvancePeriod),
Logger: log.New(os.Stderr, "[shard-precreation] ", log.LstdFlags),
Logger: zap.New(zap.NullEncoder()),
}
return &s, nil
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
s.Logger = log.New(w, "[shard-precreation] ", log.LstdFlags)
func (s *Service) WithLogger(log zap.Logger) {
s.Logger = log.With(zap.String("service", "shard-precreation"))
}
// Open starts the precreation service.
@ -46,8 +44,8 @@ func (s *Service) Open() error {
return nil
}
s.Logger.Printf("Starting precreation service with check interval of %s, advance period of %s",
s.checkInterval, s.advancePeriod)
s.Logger.Info(fmt.Sprintf("Starting precreation service with check interval of %s, advance period of %s",
s.checkInterval, s.advancePeriod))
s.done = make(chan struct{})
@ -77,10 +75,10 @@ func (s *Service) runPrecreation() {
select {
case <-time.After(s.checkInterval):
if err := s.precreate(time.Now().UTC()); err != nil {
s.Logger.Printf("failed to precreate shards: %s", err.Error())
s.Logger.Info(fmt.Sprintf("failed to precreate shards: %s", err.Error()))
}
case <-s.done:
s.Logger.Println("Precreation service terminating")
s.Logger.Info("Precreation service terminating")
return
}
}

View File

@ -1,13 +1,12 @@
package retention // import "github.com/influxdata/influxdb/services/retention"
import (
"io"
"log"
"os"
"fmt"
"sync"
"time"
"github.com/influxdata/influxdb/services/meta"
"github.com/uber-go/zap"
)
// Service represents the retention policy enforcement service.
@ -27,7 +26,7 @@ type Service struct {
wg sync.WaitGroup
done chan struct{}
logger *log.Logger
logger zap.Logger
}
// NewService returns a configured retention policy enforcement service.
@ -35,13 +34,13 @@ func NewService(c Config) *Service {
return &Service{
checkInterval: time.Duration(c.CheckInterval),
done: make(chan struct{}),
logger: log.New(os.Stderr, "[retention] ", log.LstdFlags),
logger: zap.New(zap.NullEncoder()),
}
}
// Open starts retention policy enforcement.
func (s *Service) Open() error {
s.logger.Println("Starting retention policy enforcement service with check interval of", s.checkInterval)
s.logger.Info(fmt.Sprint("Starting retention policy enforcement service with check interval of ", s.checkInterval))
s.wg.Add(2)
go s.deleteShardGroups()
go s.deleteShards()
@ -50,16 +49,14 @@ func (s *Service) Open() error {
// Close stops retention policy enforcement.
func (s *Service) Close() error {
s.logger.Println("retention policy enforcement terminating")
s.logger.Info("retention policy enforcement terminating")
close(s.done)
s.wg.Wait()
return nil
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
s.logger = log.New(w, "[retention] ", log.LstdFlags)
func (s *Service) WithLogger(log zap.Logger) {
s.logger = log.With(zap.String("service", "retention"))
}
func (s *Service) deleteShardGroups() {
@ -78,11 +75,11 @@ func (s *Service) deleteShardGroups() {
for _, r := range d.RetentionPolicies {
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
s.logger.Printf("failed to delete shard group %d from database %s, retention policy %s: %s",
g.ID, d.Name, r.Name, err.Error())
s.logger.Info(fmt.Sprintf("failed to delete shard group %d from database %s, retention policy %s: %s",
g.ID, d.Name, r.Name, err.Error()))
} else {
s.logger.Printf("deleted shard group %d from database %s, retention policy %s",
g.ID, d.Name, r.Name)
s.logger.Info(fmt.Sprintf("deleted shard group %d from database %s, retention policy %s",
g.ID, d.Name, r.Name))
}
}
}
@ -102,7 +99,7 @@ func (s *Service) deleteShards() {
return
case <-ticker.C:
s.logger.Println("retention policy shard deletion check commencing")
s.logger.Info("retention policy shard deletion check commencing")
type deletionInfo struct {
db string
@ -123,16 +120,16 @@ func (s *Service) deleteShards() {
for _, id := range s.TSDBStore.ShardIDs() {
if di, ok := deletedShardIDs[id]; ok {
if err := s.TSDBStore.DeleteShard(id); err != nil {
s.logger.Printf("failed to delete shard ID %d from database %s, retention policy %s: %s",
id, di.db, di.rp, err.Error())
s.logger.Info(fmt.Sprintf("failed to delete shard ID %d from database %s, retention policy %s: %s",
id, di.db, di.rp, err.Error()))
continue
}
s.logger.Printf("shard ID %d from database %s, retention policy %s, deleted",
id, di.db, di.rp)
s.logger.Info(fmt.Sprintf("shard ID %d from database %s, retention policy %s, deleted",
id, di.db, di.rp))
}
}
if err := s.MetaClient.PruneShardGroups(); err != nil {
s.logger.Printf("error pruning shard groups: %s", err)
s.logger.Info(fmt.Sprintf("error pruning shard groups: %s", err))
}
}
}

View File

@ -6,10 +6,7 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"log"
"net"
"os"
"strings"
"sync"
"time"
@ -17,6 +14,7 @@ import (
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/uber-go/zap"
)
const (
@ -43,20 +41,20 @@ type Service struct {
TSDBStore *tsdb.Store
Listener net.Listener
Logger *log.Logger
Logger zap.Logger
}
// NewService returns a new instance of Service.
func NewService() *Service {
return &Service{
err: make(chan error),
Logger: log.New(os.Stderr, "[snapshot] ", log.LstdFlags),
Logger: zap.New(zap.NullEncoder()),
}
}
// Open starts the service.
func (s *Service) Open() error {
s.Logger.Println("Starting snapshot service")
s.Logger.Info("Starting snapshot service")
s.wg.Add(1)
go s.serve()
@ -72,10 +70,8 @@ func (s *Service) Close() error {
return nil
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
s.Logger = log.New(w, "[snapshot] ", log.LstdFlags)
func (s *Service) WithLogger(log zap.Logger) {
s.Logger = log.With(zap.String("service", "snapshot"))
}
// Err returns a channel for fatal out-of-band errors.
@ -89,10 +85,10 @@ func (s *Service) serve() {
// Wait for next connection.
conn, err := s.Listener.Accept()
if err != nil && strings.Contains(err.Error(), "connection closed") {
s.Logger.Println("snapshot listener closed")
s.Logger.Info("snapshot listener closed")
return
} else if err != nil {
s.Logger.Println("error accepting snapshot request: ", err.Error())
s.Logger.Info(fmt.Sprint("error accepting snapshot request: ", err.Error()))
continue
}
@ -102,7 +98,7 @@ func (s *Service) serve() {
defer s.wg.Done()
defer conn.Close()
if err := s.handleConn(conn); err != nil {
s.Logger.Println(err)
s.Logger.Info(err.Error())
}
}(conn)
}

View File

@ -3,10 +3,7 @@ package subscriber // import "github.com/influxdata/influxdb/services/subscriber
import (
"errors"
"fmt"
"io"
"log"
"net/url"
"os"
"sync"
"sync/atomic"
"time"
@ -15,6 +12,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/services/meta"
"github.com/uber-go/zap"
)
// Statistics for the Subscriber service.
@ -47,7 +45,7 @@ type Service struct {
WaitForDataChanged() chan struct{}
}
NewPointsWriter func(u url.URL) (PointsWriter, error)
Logger *log.Logger
Logger zap.Logger
update chan struct{}
stats *Statistics
points chan *coordinator.WritePointsRequest
@ -64,7 +62,7 @@ type Service struct {
// NewService returns a subscriber service with given settings
func NewService(c Config) *Service {
s := &Service{
Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags),
Logger: zap.New(zap.NullEncoder()),
closed: true,
stats: &Statistics{},
conf: c,
@ -97,7 +95,7 @@ func (s *Service) Open() error {
s.waitForMetaUpdates()
}()
s.Logger.Println("opened service")
s.Logger.Info("opened service")
return nil
}
@ -112,14 +110,12 @@ func (s *Service) Close() error {
close(s.closing)
s.wg.Wait()
s.Logger.Println("closed service")
s.Logger.Info("closed service")
return nil
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
s.Logger = log.New(w, "[subscriber] ", log.LstdFlags)
func (s *Service) WithLogger(log zap.Logger) {
s.Logger = log.With(zap.String("service", "subscriber"))
}
// Statistics maintains the statistics for the subscriber service.
@ -157,7 +153,7 @@ func (s *Service) waitForMetaUpdates() {
case <-ch:
err := s.Update()
if err != nil {
s.Logger.Println("error updating subscriptions:", err)
s.Logger.Info(fmt.Sprint("error updating subscriptions: ", err))
}
case <-s.closing:
return
@ -288,7 +284,7 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) {
sub, err := s.createSubscription(se, si.Mode, si.Destinations)
if err != nil {
atomic.AddInt64(&s.stats.CreateFailures, 1)
s.Logger.Printf("Subscription creation failed for '%s' with error: %s", si.Name, err)
s.Logger.Info(fmt.Sprintf("Subscription creation failed for '%s' with error: %s", si.Name, err))
continue
}
cw := chanWriter{
@ -306,7 +302,7 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) {
}()
}
s.subs[se] = cw
s.Logger.Println("added new subscription for", se.db, se.rp)
s.Logger.Info(fmt.Sprintf("added new subscription for %s %s", se.db, se.rp))
}
}
}
@ -319,7 +315,7 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) {
// Remove it from the set
delete(s.subs, se)
s.Logger.Println("deleted old subscription for", se.db, se.rp)
s.Logger.Info(fmt.Sprintf("deleted old subscription for %s %s", se.db, se.rp))
}
}
}
@ -333,7 +329,7 @@ func (s *Service) newPointsWriter(u url.URL) (PointsWriter, error) {
return NewHTTP(u.String(), time.Duration(s.conf.HTTPTimeout))
case "https":
if s.conf.InsecureSkipVerify {
s.Logger.Println("WARNING: 'insecure-skip-verify' is true. This will skip all certificate verifications.")
s.Logger.Info("WARNING: 'insecure-skip-verify' is true. This will skip all certificate verifications.")
}
return NewHTTPS(u.String(), time.Duration(s.conf.HTTPTimeout), s.conf.InsecureSkipVerify, s.conf.CaCerts)
default:
@ -347,7 +343,7 @@ type chanWriter struct {
pw PointsWriter
pointsWritten *int64
failures *int64
logger *log.Logger
logger zap.Logger
}
// Close the chanWriter
@ -359,7 +355,7 @@ func (c chanWriter) Run() {
for wr := range c.writeRequests {
err := c.pw.WritePoints(wr)
if err != nil {
c.logger.Println(err)
c.logger.Info(err.Error())
atomic.AddInt64(c.failures, 1)
} else {
atomic.AddInt64(c.pointsWritten, int64(len(wr.Points)))

View File

@ -2,10 +2,8 @@ package udp // import "github.com/influxdata/influxdb/services/udp"
import (
"errors"
"io"
"log"
"fmt"
"net"
"os"
"sync"
"sync/atomic"
"time"
@ -13,6 +11,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/uber-go/zap"
)
const (
@ -59,7 +58,7 @@ type Service struct {
CreateDatabase(name string) (*meta.DatabaseInfo, error)
}
Logger *log.Logger
Logger zap.Logger
stats *Statistics
defaultTags models.StatisticTags
}
@ -71,7 +70,7 @@ func NewService(c Config) *Service {
config: d,
parserChan: make(chan []byte, parserChanLen),
batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)),
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
Logger: zap.New(zap.NullEncoder()),
stats: &Statistics{},
defaultTags: models.StatisticTags{"bind": d.BindAddress},
}
@ -96,26 +95,26 @@ func (s *Service) Open() (err error) {
s.addr, err = net.ResolveUDPAddr("udp", s.config.BindAddress)
if err != nil {
s.Logger.Printf("Failed to resolve UDP address %s: %s", s.config.BindAddress, err)
s.Logger.Info(fmt.Sprintf("Failed to resolve UDP address %s: %s", s.config.BindAddress, err))
return err
}
s.conn, err = net.ListenUDP("udp", s.addr)
if err != nil {
s.Logger.Printf("Failed to set up UDP listener at address %s: %s", s.addr, err)
s.Logger.Info(fmt.Sprintf("Failed to set up UDP listener at address %s: %s", s.addr, err))
return err
}
if s.config.ReadBuffer != 0 {
err = s.conn.SetReadBuffer(s.config.ReadBuffer)
if err != nil {
s.Logger.Printf("Failed to set UDP read buffer to %d: %s",
s.config.ReadBuffer, err)
s.Logger.Info(fmt.Sprintf("Failed to set UDP read buffer to %d: %s",
s.config.ReadBuffer, err))
return err
}
}
s.Logger.Printf("Started listening on UDP: %s", s.config.BindAddress)
s.Logger.Info(fmt.Sprintf("Started listening on UDP: %s", s.config.BindAddress))
s.wg.Add(3)
go s.serve()
@ -161,7 +160,7 @@ func (s *Service) writer() {
case batch := <-s.batcher.Out():
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.Logger.Printf("Required database %s does not yet exist: %s", s.config.Database, err.Error())
s.Logger.Info(fmt.Sprintf("Required database %s does not yet exist: %s", s.config.Database, err.Error()))
continue
}
@ -169,7 +168,7 @@ func (s *Service) writer() {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {
s.Logger.Printf("failed to write point batch to database %q: %s", s.config.Database, err)
s.Logger.Info(fmt.Sprintf("failed to write point batch to database %q: %s", s.config.Database, err))
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
}
@ -195,7 +194,7 @@ func (s *Service) serve() {
n, _, err := s.conn.ReadFromUDP(buf)
if err != nil {
atomic.AddInt64(&s.stats.ReadFail, 1)
s.Logger.Printf("Failed to read UDP message: %s", err)
s.Logger.Info(fmt.Sprintf("Failed to read UDP message: %s", err))
continue
}
atomic.AddInt64(&s.stats.BytesReceived, int64(n))
@ -218,7 +217,7 @@ func (s *Service) parser() {
points, err := models.ParsePointsWithPrecision(buf, time.Now().UTC(), s.config.Precision)
if err != nil {
atomic.AddInt64(&s.stats.PointsParseFail, 1)
s.Logger.Printf("Failed to parse points: %s", err)
s.Logger.Info(fmt.Sprintf("Failed to parse points: %s", err))
continue
}
@ -251,7 +250,7 @@ func (s *Service) Close() error {
s.done = nil
s.conn = nil
s.Logger.Print("Service closed")
s.Logger.Info("Service closed")
return nil
}
@ -293,10 +292,8 @@ func (s *Service) createInternalStorage() error {
return nil
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
s.Logger = log.New(w, "[udp] ", log.LstdFlags)
func (s *Service) WithLogger(log zap.Logger) {
s.Logger = log.With(zap.String("service", "udp"))
}
// Addr returns the listener's address

View File

@ -2,13 +2,14 @@ package udp
import (
"errors"
"io/ioutil"
"os"
"testing"
"time"
"github.com/influxdata/influxdb/internal"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/uber-go/zap"
)
func TestService_OpenClose(t *testing.T) {
@ -142,8 +143,11 @@ func NewTestService(c *Config) *TestService {
MetaClient: &internal.MetaClientMock{},
}
if !testing.Verbose() {
service.Service.SetLogOutput(ioutil.Discard)
if testing.Verbose() {
service.Service.WithLogger(zap.New(
zap.NewTextEncoder(),
zap.Output(os.Stderr),
))
}
service.Service.MetaClient = service.MetaClient

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/uber-go/zap"
)
var (
@ -27,7 +28,7 @@ type Engine interface {
Open() error
Close() error
SetLogOutput(io.Writer)
WithLogger(zap.Logger)
LoadMetadataIndex(shardID uint64, index *DatabaseIndex) error
Backup(w io.Writer, basePath string, since time.Time) error

View File

@ -2,8 +2,6 @@ package tsm1
import (
"fmt"
"io"
"log"
"math"
"os"
"sort"
@ -13,6 +11,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/uber-go/zap"
)
var (
@ -588,14 +587,14 @@ func (c *Cache) entry(key string) *entry {
type CacheLoader struct {
files []string
Logger *log.Logger
Logger zap.Logger
}
// NewCacheLoader returns a new instance of a CacheLoader.
func NewCacheLoader(files []string) *CacheLoader {
return &CacheLoader{
files: files,
Logger: log.New(os.Stderr, "[cacheloader] ", log.LstdFlags),
Logger: zap.New(zap.NullEncoder()),
}
}
@ -616,7 +615,7 @@ func (cl *CacheLoader) Load(cache *Cache) error {
if err != nil {
return err
}
cl.Logger.Printf("reading file %s, size %d", f.Name(), stat.Size())
cl.Logger.Info(fmt.Sprintf("reading file %s, size %d", f.Name(), stat.Size()))
r := NewWALSegmentReader(f)
defer r.Close()
@ -625,7 +624,7 @@ func (cl *CacheLoader) Load(cache *Cache) error {
entry, err := r.Read()
if err != nil {
n := r.Count()
cl.Logger.Printf("file %s corrupt at position %d, truncating", f.Name(), n)
cl.Logger.Info(fmt.Sprintf("file %s corrupt at position %d, truncating", f.Name(), n))
if err := f.Truncate(n); err != nil {
return err
}
@ -652,10 +651,8 @@ func (cl *CacheLoader) Load(cache *Cache) error {
return nil
}
// SetLogOutput sets the logger used for all messages. It must not be called
// after the Open method has been called.
func (cl *CacheLoader) SetLogOutput(w io.Writer) {
cl.Logger = log.New(w, "[cacheloader] ", log.LstdFlags)
func (cl *CacheLoader) WithLogger(log zap.Logger) {
cl.Logger = log.With(zap.String("service", "cacheloader"))
}
// Updates the age statistic

View File

@ -6,7 +6,6 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
"path/filepath"
@ -20,6 +19,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/uber-go/zap"
)
//go:generate tmpl -data=@iterator.gen.go.tmpldata iterator.gen.go.tmpl
@ -93,9 +93,8 @@ type Engine struct {
id uint64
path string
logger *log.Logger // Logger to be used for important messages
traceLogger *log.Logger // Logger to be used when trace-logging is on.
logOutput io.Writer // Writer to be logger and traceLogger if active.
logger zap.Logger // Logger to be used for important messages
traceLogger zap.Logger // Logger to be used when trace-logging is on.
traceLogging bool
// TODO(benbjohnson): Index needs to be moved entirely into engine.
@ -136,12 +135,12 @@ func NewEngine(id uint64, path string, walPath string, opt tsdb.EngineOptions) t
FileStore: fs,
}
logger := zap.New(zap.NullEncoder())
e := &Engine{
id: id,
path: path,
logger: log.New(os.Stderr, "[tsm1] ", log.LstdFlags),
traceLogger: log.New(ioutil.Discard, "[tsm1] ", log.LstdFlags),
logOutput: os.Stderr,
logger: logger,
traceLogger: logger,
traceLogging: opt.Config.TraceLoggingEnabled,
measurementFields: make(map[string]*tsdb.MeasurementFields),
@ -163,7 +162,6 @@ func NewEngine(id uint64, path string, walPath string, opt tsdb.EngineOptions) t
}
if e.traceLogging {
e.traceLogger.SetOutput(e.logOutput)
fs.enableTraceLogging(true)
w.enableTraceLogging(true)
}
@ -242,7 +240,7 @@ func (e *Engine) disableLevelCompactions(wait bool) {
if old == 0 { // first to disable should cleanup
if err := e.cleanup(); err != nil {
e.logger.Printf("error cleaning up temp file: %v", err)
e.logger.Info(fmt.Sprintf("error cleaning up temp file: %v", err))
}
}
}
@ -424,22 +422,15 @@ func (e *Engine) Close() error {
return e.WAL.Close()
}
// SetLogOutput sets the logger used for all messages. It is safe for concurrent
// use.
func (e *Engine) SetLogOutput(w io.Writer) {
e.logger.SetOutput(w)
func (e *Engine) WithLogger(log zap.Logger) {
e.logger = log.With(zap.String("engine", "tsm1"))
// Set the trace logger's output only if trace logging is enabled.
if e.traceLogging {
e.traceLogger.SetOutput(w)
e.traceLogger = e.logger
}
e.WAL.SetLogOutput(w)
e.FileStore.SetLogOutput(w)
e.mu.Lock()
e.logOutput = w
e.mu.Unlock()
e.WAL.WithLogger(e.logger)
e.FileStore.WithLogger(e.logger)
}
// LoadMetadataIndex loads the shard metadata into memory.
@ -472,7 +463,7 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) er
fieldType, err := entry.values.InfluxQLType()
if err != nil {
e.logger.Printf("error getting the data type of values for key %s: %s", key, err.Error())
e.logger.Info(fmt.Sprintf("error getting the data type of values for key %s: %s", key, err.Error()))
continue
}
@ -481,7 +472,7 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) er
}
}
e.traceLogger.Printf("Meta data index for shard %d loaded in %v", shardID, time.Since(now))
e.traceLogger.Info(fmt.Sprintf("Meta data index for shard %d loaded in %v", shardID, time.Since(now)))
return nil
}
@ -853,7 +844,7 @@ func (e *Engine) WriteSnapshot() error {
defer func() {
if started != nil {
e.Cache.UpdateCompactTime(time.Now().Sub(*started))
e.logger.Printf("Snapshot for path %s written in %v", e.path, time.Since(*started))
e.logger.Info(fmt.Sprintf("Snapshot for path %s written in %v", e.path, time.Since(*started)))
}
}()
@ -890,7 +881,7 @@ func (e *Engine) WriteSnapshot() error {
// holding the engine write lock.
dedup := time.Now()
snapshot.Deduplicate()
e.traceLogger.Printf("Snapshot for path %s deduplicated in %v", e.path, time.Since(dedup))
e.traceLogger.Info(fmt.Sprintf("Snapshot for path %s deduplicated in %v", e.path, time.Since(dedup)))
return e.writeSnapshotAndCommit(closedFiles, snapshot)
}
@ -919,7 +910,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (
// write the new snapshot files
newFiles, err := e.Compactor.WriteSnapshot(snapshot)
if err != nil {
e.logger.Printf("error writing snapshot from compactor: %v", err)
e.logger.Info(fmt.Sprintf("error writing snapshot from compactor: %v", err))
return err
}
@ -928,7 +919,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (
// update the file store with these new files
if err := e.FileStore.Replace(nil, newFiles); err != nil {
e.logger.Printf("error adding new TSM files from snapshot: %v", err)
e.logger.Info(fmt.Sprintf("error adding new TSM files from snapshot: %v", err))
return err
}
@ -936,7 +927,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (
e.Cache.ClearSnapshot(true)
if err := e.WAL.Remove(closedFiles); err != nil {
e.logger.Printf("error removing closed wal segments: %v", err)
e.logger.Info(fmt.Sprintf("error removing closed wal segments: %v", err))
}
return nil
@ -955,10 +946,10 @@ func (e *Engine) compactCache(quit <-chan struct{}) {
e.Cache.UpdateAge()
if e.ShouldCompactCache(e.WAL.LastWriteTime()) {
start := time.Now()
e.traceLogger.Printf("Compacting cache for %s", e.path)
e.traceLogger.Info(fmt.Sprintf("Compacting cache for %s", e.path))
err := e.WriteSnapshot()
if err != nil && err != errCompactionsDisabled {
e.logger.Printf("error writing snapshot: %v", err)
e.logger.Info(fmt.Sprintf("error writing snapshot: %v", err))
atomic.AddInt64(&e.stats.CacheCompactionErrors, 1)
} else {
atomic.AddInt64(&e.stats.CacheCompactions, 1)
@ -1034,7 +1025,7 @@ type compactionStrategy struct {
successStat *int64
errorStat *int64
logger *log.Logger
logger zap.Logger
compactor *Compactor
fileStore *FileStore
}
@ -1060,9 +1051,9 @@ func (s *compactionStrategy) Apply() {
func (s *compactionStrategy) compactGroup(groupNum int) {
group := s.compactionGroups[groupNum]
start := time.Now()
s.logger.Printf("beginning %s compaction of group %d, %d TSM files", s.description, groupNum, len(group))
s.logger.Info(fmt.Sprintf("beginning %s compaction of group %d, %d TSM files", s.description, groupNum, len(group)))
for i, f := range group {
s.logger.Printf("compacting %s group (%d) %s (#%d)", s.description, groupNum, f, i)
s.logger.Info(fmt.Sprintf("compacting %s group (%d) %s (#%d)", s.description, groupNum, f, i))
}
files, err := func() ([]string, error) {
@ -1079,7 +1070,7 @@ func (s *compactionStrategy) compactGroup(groupNum int) {
if err != nil {
if err == errCompactionsDisabled || err == errCompactionInProgress {
s.logger.Printf("aborted %s compaction group (%d). %v", s.description, groupNum, err)
s.logger.Info(fmt.Sprintf("aborted %s compaction group (%d). %v", s.description, groupNum, err))
if err == errCompactionInProgress {
time.Sleep(time.Second)
@ -1087,23 +1078,23 @@ func (s *compactionStrategy) compactGroup(groupNum int) {
return
}
s.logger.Printf("error compacting TSM files: %v", err)
s.logger.Info(fmt.Sprintf("error compacting TSM files: %v", err))
atomic.AddInt64(s.errorStat, 1)
time.Sleep(time.Second)
return
}
if err := s.fileStore.Replace(group, files); err != nil {
s.logger.Printf("error replacing new TSM files: %v", err)
s.logger.Info(fmt.Sprintf("error replacing new TSM files: %v", err))
atomic.AddInt64(s.errorStat, 1)
time.Sleep(time.Second)
return
}
for i, f := range files {
s.logger.Printf("compacted %s group (%d) into %s (#%d)", s.description, groupNum, f, i)
s.logger.Info(fmt.Sprintf("compacted %s group (%d) into %s (#%d)", s.description, groupNum, f, i))
}
s.logger.Printf("compacted %s %d files into %d files in %s", s.description, len(group), len(files), time.Since(start))
s.logger.Info(fmt.Sprintf("compacted %s %d files into %d files in %s", s.description, len(group), len(files), time.Since(start)))
atomic.AddInt64(s.successStat, 1)
}
@ -1188,12 +1179,12 @@ func (e *Engine) reloadCache() error {
e.Cache.SetMaxSize(0)
loader := NewCacheLoader(files)
loader.SetLogOutput(e.logOutput)
loader.WithLogger(e.logger)
if err := loader.Load(e.Cache); err != nil {
return err
}
e.traceLogger.Printf("Reloaded WAL cache %s in %v", e.WAL.Path(), time.Since(now))
e.traceLogger.Info(fmt.Sprintf("Reloaded WAL cache %s in %v", e.WAL.Path(), time.Since(now)))
return nil
}

View File

@ -4,7 +4,6 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
"path/filepath"
@ -16,6 +15,7 @@ import (
"time"
"github.com/influxdata/influxdb/models"
"github.com/uber-go/zap"
)
type TSMFile interface {
@ -133,9 +133,9 @@ type FileStore struct {
files []TSMFile
logger *log.Logger // Logger to be used for important messages
traceLogger *log.Logger // Logger to be used when trace-logging is on.
logOutput io.Writer // Writer to be logger and traceLogger if active.
logger zap.Logger // Logger to be used for important messages
traceLogger zap.Logger // Logger to be used when trace-logging is on.
logOutput io.Writer // Writer to be logger and traceLogger if active.
traceLogging bool
stats *FileStoreStatistics
@ -168,13 +168,12 @@ func (f FileStat) ContainsKey(key string) bool {
}
func NewFileStore(dir string) *FileStore {
logger := log.New(os.Stderr, "[filestore] ", log.LstdFlags)
logger := zap.New(zap.NullEncoder())
fs := &FileStore{
dir: dir,
lastModified: time.Time{},
logger: logger,
traceLogger: log.New(ioutil.Discard, "[filestore] ", log.LstdFlags),
logOutput: os.Stderr,
traceLogger: logger,
stats: &FileStoreStatistics{},
purger: &purger{
files: map[string]TSMFile{},
@ -189,23 +188,17 @@ func NewFileStore(dir string) *FileStore {
func (f *FileStore) enableTraceLogging(enabled bool) {
f.traceLogging = enabled
if enabled {
f.traceLogger.SetOutput(f.logOutput)
f.traceLogger = f.logger
}
}
// SetLogOutput sets the logger used for all messages. It is safe for concurrent
// use.
func (f *FileStore) SetLogOutput(w io.Writer) {
f.logger.SetOutput(w)
func (f *FileStore) WithLogger(log zap.Logger) {
f.logger = log.With(zap.String("service", "filestore"))
f.purger.logger = f.logger
// Set the trace logger's output only if trace logging is enabled.
if f.traceLogging {
f.traceLogger.SetOutput(w)
f.traceLogger = f.logger
}
f.mu.Lock()
f.logOutput = w
f.mu.Unlock()
}
// FileStoreStatistics keeps statistics about the file store.
@ -422,7 +415,7 @@ func (f *FileStore) Open() error {
go func(idx int, file *os.File) {
start := time.Now()
df, err := NewTSMReader(file)
f.logger.Printf("%s (#%d) opened in %v", file.Name(), idx, time.Now().Sub(start))
f.logger.Info(fmt.Sprintf("%s (#%d) opened in %v", file.Name(), idx, time.Now().Sub(start)))
if err != nil {
readerC <- &res{r: df, err: fmt.Errorf("error opening memory map for file %s: %v", file.Name(), err)}
@ -802,7 +795,7 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []*location {
// CreateSnapshot will create hardlinks for all tsm and tombstone files
// in the path provided
func (f *FileStore) CreateSnapshot() (string, error) {
f.traceLogger.Printf("Creating snapshot in %s", f.dir)
f.traceLogger.Info(fmt.Sprintf("Creating snapshot in %s", f.dir))
files := f.Files()
f.mu.Lock()
@ -1160,7 +1153,7 @@ type purger struct {
files map[string]TSMFile
running bool
logger *log.Logger
logger zap.Logger
}
func (p *purger) add(files []TSMFile) {
@ -1192,12 +1185,12 @@ func (p *purger) purge() {
}
if err := v.Close(); err != nil {
p.logger.Printf("purge: close file: %v", err)
p.logger.Info(fmt.Sprintf("purge: close file: %v", err))
continue
}
if err := v.Remove(); err != nil {
p.logger.Printf("purge: remove file: %v", err)
p.logger.Info(fmt.Sprintf("purge: remove file: %v", err))
continue
}
delete(p.files, k)

View File

@ -11,6 +11,7 @@ import (
"time"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/uber-go/zap"
)
func TestFileStore_Read(t *testing.T) {
@ -2441,8 +2442,11 @@ func BenchmarkFileStore_Stats(b *testing.B) {
}
fs := tsm1.NewFileStore(dir)
if !testing.Verbose() {
fs.SetLogOutput(ioutil.Discard)
if testing.Verbose() {
fs.WithLogger(zap.New(
zap.NewTextEncoder(),
zap.Output(os.Stderr),
))
}
if err := fs.Open(); err != nil {

View File

@ -5,8 +5,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
"path/filepath"
@ -21,6 +19,7 @@ import (
"github.com/golang/snappy"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/uber-go/zap"
)
const (
@ -86,9 +85,8 @@ type WAL struct {
closing chan struct{}
// WALOutput is the writer used by the logger.
logger *log.Logger // Logger to be used for important messages
traceLogger *log.Logger // Logger to be used when trace-logging is on.
logOutput io.Writer // Writer to be logger and traceLogger if active.
logger zap.Logger // Logger to be used for important messages
traceLogger zap.Logger // Logger to be used when trace-logging is on.
traceLogging bool
// SegmentSize is the file size at which a segment file will be rotated
@ -100,6 +98,7 @@ type WAL struct {
}
func NewWAL(path string) *WAL {
logger := zap.New(zap.NullEncoder())
return &WAL{
path: path,
@ -108,9 +107,8 @@ func NewWAL(path string) *WAL {
closing: make(chan struct{}),
stats: &WALStatistics{},
limiter: limiter.NewFixed(defaultWaitingWALWrites),
logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags),
traceLogger: log.New(ioutil.Discard, "[tsm1wal] ", log.LstdFlags),
logOutput: os.Stderr,
logger: logger,
traceLogger: logger,
}
}
@ -118,23 +116,16 @@ func NewWAL(path string) *WAL {
func (l *WAL) enableTraceLogging(enabled bool) {
l.traceLogging = enabled
if enabled {
l.traceLogger.SetOutput(l.logOutput)
l.traceLogger = l.logger
}
}
// SetLogOutput sets the location that logs are written to. It is safe for
// concurrent use.
func (l *WAL) SetLogOutput(w io.Writer) {
l.logger.SetOutput(w)
func (l *WAL) WithLogger(log zap.Logger) {
l.logger = log.With(zap.String("service", "wal"))
// Set the trace logger's output only if trace logging is enabled.
if l.traceLogging {
l.traceLogger.SetOutput(w)
l.traceLogger = l.logger
}
l.mu.Lock()
l.logOutput = w
l.mu.Unlock()
}
// WALStatistics maintains statistics about the WAL.
@ -171,8 +162,8 @@ func (l *WAL) Open() error {
l.mu.Lock()
defer l.mu.Unlock()
l.traceLogger.Printf("tsm1 WAL starting with %d segment size\n", l.SegmentSize)
l.traceLogger.Printf("tsm1 WAL writing to %s\n", l.path)
l.traceLogger.Info(fmt.Sprintf("tsm1 WAL starting with %d segment size\n", l.SegmentSize))
l.traceLogger.Info(fmt.Sprintf("tsm1 WAL writing to %s\n", l.path))
if err := os.MkdirAll(l.path, 0777); err != nil {
return err
@ -277,7 +268,7 @@ func (l *WAL) Remove(files []string) error {
l.mu.Lock()
defer l.mu.Unlock()
for _, fn := range files {
l.traceLogger.Printf("Removing %s", fn)
l.traceLogger.Info(fmt.Sprintf("Removing %s", fn))
os.RemoveAll(fn)
}
@ -425,7 +416,7 @@ func (l *WAL) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
l.traceLogger.Printf("Closing %s", l.path)
l.traceLogger.Info(fmt.Sprintf("Closing %s", l.path))
// Close, but don't set to nil so future goroutines can still be signaled
close(l.closing)

View File

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sort"
@ -18,6 +17,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
internal "github.com/influxdata/influxdb/tsdb/internal"
"github.com/uber-go/zap"
)
// monitorStatInterval is the interval at which the shard is inspected
@ -119,9 +119,8 @@ type Shard struct {
stats *ShardStatistics
defaultTags models.StatisticTags
logger *log.Logger
// used by logger. Referenced so it can be passed down to new caches.
logOutput io.Writer
baseLogger zap.Logger
logger zap.Logger
EnableOnOpen bool
}
@ -129,6 +128,7 @@ type Shard struct {
// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, options EngineOptions) *Shard {
db, rp := DecodeStorePath(path)
logger := zap.New(zap.NullEncoder())
s := &Shard{
index: index,
id: id,
@ -150,24 +150,19 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
database: db,
retentionPolicy: rp,
logger: log.New(os.Stderr, "[shard] ", log.LstdFlags),
logOutput: os.Stderr,
logger: logger,
baseLogger: logger,
EnableOnOpen: true,
}
return s
}
// SetLogOutput sets the writer to which log output will be written. It is safe
// for concurrent use.
func (s *Shard) SetLogOutput(w io.Writer) {
s.logger.SetOutput(w)
func (s *Shard) WithLogger(log zap.Logger) {
s.baseLogger = log
if err := s.ready(); err == nil {
s.engine.SetLogOutput(w)
s.engine.WithLogger(s.baseLogger)
}
s.mu.Lock()
s.logOutput = w
s.mu.Unlock()
s.logger = s.baseLogger.With(zap.String("service", "shard"))
}
// SetEnabled enables the shard for queries and write. When disabled, all
@ -246,7 +241,7 @@ func (s *Shard) Open() error {
}
// Set log output on the engine.
e.SetLogOutput(s.logOutput)
e.WithLogger(s.baseLogger)
// Disable compactions while loading the index
e.SetEnabled(false)
@ -267,7 +262,7 @@ func (s *Shard) Open() error {
s.engine = e
s.logger.Printf("%s database index loaded in %s", s.path, time.Now().Sub(start))
s.logger.Info(fmt.Sprintf("%s database index loaded in %s", s.path, time.Now().Sub(start)))
go s.monitor()
@ -549,7 +544,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
// verify the tags and fields
tags := p.Tags()
if v := tags.Get(timeBytes); v != nil {
s.logger.Printf("dropping tag 'time' from '%s'\n", p.PrecisionString(""))
s.logger.Info(fmt.Sprintf("dropping tag 'time' from '%s'\n", p.PrecisionString("")))
tags.Delete(timeBytes)
p.SetTags(tags)
}
@ -558,7 +553,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
iter := p.FieldIterator()
for iter.Next() {
if bytes.Equal(iter.FieldKey(), timeBytes) {
s.logger.Printf("dropping field 'time' from '%s'\n", p.PrecisionString(""))
s.logger.Info(fmt.Sprintf("dropping field 'time' from '%s'\n", p.PrecisionString("")))
iter.Delete()
continue
}
@ -854,7 +849,7 @@ func (s *Shard) monitor() {
case <-t.C:
size, err := s.DiskSize()
if err != nil {
s.logger.Printf("Error collecting shard size: %v", err)
s.logger.Info(fmt.Sprintf("Error collecting shard size: %v", err))
continue
}
atomic.StoreInt64(&s.stats.DiskBytes, size)
@ -873,8 +868,8 @@ func (s *Shard) monitor() {
// Log at 80, 85, 90-100% levels
if perc == 80 || perc == 85 || perc >= 90 {
s.logger.Printf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s",
perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, m.Name, k)
s.logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s",
perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, m.Name, k))
}
})
}

View File

@ -19,6 +19,7 @@ import (
"github.com/influxdata/influxdb/pkg/deep"
"github.com/influxdata/influxdb/tsdb"
_ "github.com/influxdata/influxdb/tsdb/engine"
"github.com/uber-go/zap"
)
// DefaultPrecision is the precision used by the MustWritePointsString() function.
@ -229,7 +230,7 @@ func TestWriteTimeTag(t *testing.T) {
)
buf := bytes.NewBuffer(nil)
sh.SetLogOutput(buf)
sh.WithLogger(zap.New(zap.NewTextEncoder(), zap.Output(zap.AddSync(buf))))
if err := sh.WritePoints([]models.Point{pt}); err != nil {
t.Fatalf("unexpected error: %v", err)
} else if got, exp := buf.String(), "dropping field 'time'"; !strings.Contains(got, exp) {
@ -249,7 +250,7 @@ func TestWriteTimeTag(t *testing.T) {
)
buf = bytes.NewBuffer(nil)
sh.SetLogOutput(buf)
sh.WithLogger(zap.New(zap.NewTextEncoder(), zap.Output(zap.AddSync(buf))))
if err := sh.WritePoints([]models.Point{pt}); err != nil {
t.Fatalf("unexpected error: %v", err)
} else if got, exp := buf.String(), "dropping field 'time'"; !strings.Contains(got, exp) {
@ -290,7 +291,7 @@ func TestWriteTimeField(t *testing.T) {
)
buf := bytes.NewBuffer(nil)
sh.SetLogOutput(buf)
sh.WithLogger(zap.New(zap.NewTextEncoder(), zap.Output(zap.AddSync(buf))))
if err := sh.WritePoints([]models.Point{pt}); err != nil {
t.Fatalf("unexpected error: %v", err)
} else if got, exp := buf.String(), "dropping tag 'time'"; !strings.Contains(got, exp) {

View File

@ -5,7 +5,6 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"runtime"
@ -18,6 +17,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/uber-go/zap"
)
var (
@ -38,10 +38,8 @@ type Store struct {
shards map[uint64]*Shard
EngineOptions EngineOptions
Logger *log.Logger
// logOutput is where output from the underlying databases will go.
logOutput io.Writer
baseLogger zap.Logger
Logger zap.Logger
closing chan struct{}
wg sync.WaitGroup
@ -53,25 +51,21 @@ type Store struct {
func NewStore(path string) *Store {
opts := NewEngineOptions()
logger := zap.New(zap.NullEncoder())
return &Store{
path: path,
EngineOptions: opts,
Logger: log.New(os.Stderr, "[store] ", log.LstdFlags),
logOutput: os.Stderr,
Logger: logger,
baseLogger: logger,
}
}
// SetLogOutput sets the writer to which all logs are written. It is safe for
// concurrent use.
func (s *Store) SetLogOutput(w io.Writer) {
s.Logger.SetOutput(w)
for _, s := range s.shards {
s.SetLogOutput(w)
func (s *Store) WithLogger(log zap.Logger) {
s.baseLogger = log
s.Logger = log.With(zap.String("service", "store"))
for _, sh := range s.shards {
sh.WithLogger(s.baseLogger)
}
s.mu.Lock()
s.logOutput = w
s.mu.Unlock()
}
func (s *Store) Statistics(tags map[string]string) []models.Statistic {
@ -107,7 +101,7 @@ func (s *Store) Open() error {
s.shards = map[uint64]*Shard{}
s.databaseIndexes = map[string]*DatabaseIndex{}
s.Logger.Printf("Using data dir: %v", s.Path())
s.Logger.Info(fmt.Sprintf("Using data dir: %v", s.Path()))
// Create directory.
if err := os.MkdirAll(s.path, 0777); err != nil {
@ -135,7 +129,7 @@ func (s *Store) loadIndexes() error {
}
for _, db := range dbs {
if !db.IsDir() {
s.Logger.Printf("Skipping database dir: %s. Not a directory", db.Name())
s.Logger.Info(fmt.Sprintf("Skipping database dir: %s. Not a directory", db.Name()))
continue
}
s.databaseIndexes[db.Name()] = NewDatabaseIndex(db.Name())
@ -165,7 +159,7 @@ func (s *Store) loadShards() error {
for _, rp := range rps {
// retention policies should be directories. Skip anything that is not a dir.
if !rp.IsDir() {
s.Logger.Printf("Skipping retention policy dir: %s. Not a directory", rp.Name())
s.Logger.Info(fmt.Sprintf("Skipping retention policy dir: %s. Not a directory", rp.Name()))
continue
}
@ -191,7 +185,7 @@ func (s *Store) loadShards() error {
}
shard := NewShard(shardID, s.databaseIndexes[db], path, walPath, s.EngineOptions)
shard.SetLogOutput(s.logOutput)
shard.WithLogger(s.baseLogger)
err = shard.Open()
if err != nil {
@ -200,7 +194,7 @@ func (s *Store) loadShards() error {
}
resC <- &res{s: shard}
s.Logger.Printf("%s opened in %s", path, time.Now().Sub(start))
s.Logger.Info(fmt.Sprintf("%s opened in %s", path, time.Now().Sub(start)))
}(s.databaseIndexes[db], db, rp.Name(), sh.Name())
}
}
@ -209,7 +203,7 @@ func (s *Store) loadShards() error {
for i := 0; i < n; i++ {
res := <-resC
if res.err != nil {
s.Logger.Println(res.err)
s.Logger.Info(res.err.Error())
continue
}
s.shards[res.s.id] = res.s
@ -319,7 +313,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en
path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
shard := NewShard(shardID, db, path, walPath, s.EngineOptions)
shard.SetLogOutput(s.logOutput)
shard.WithLogger(s.baseLogger)
shard.EnableOnOpen = enabled
if err := shard.Open(); err != nil {