From 9dc09c5257edde96aa5170c101883c2ee44459f5 Mon Sep 17 00:00:00 2001 From: Stephen Gutekanst Date: Wed, 20 Apr 2016 13:07:08 -0700 Subject: [PATCH] Make logging output location more programmatically configurable (#6213) This has various benefits: - Users embedding InfluxDB within other Go programs can specify a different logger / prefix easily. - More consistent with code used elsewhere in InfluxDB (e.g. services, other `run.Server.*` fields, etc). - This is also more efficient, because it means `executeQuery` no longer allocates a single `*log.Logger` each time it is called. --- CHANGELOG.md | 1 + cluster/points_writer.go | 7 ++++ cluster/service.go | 7 ++-- cluster/statement_executor_test.go | 6 ++-- cmd/influxd/restore/restore.go | 3 +- cmd/influxd/run/server.go | 44 ++++++++++++++++++++++---- cmd/influxd/run/server_helpers_test.go | 15 +-------- influxql/query_executor.go | 22 ++++++------- monitor/service.go | 8 +++-- services/admin/service.go | 8 +++-- services/collectd/service.go | 8 +++-- services/continuous_querier/service.go | 8 +++-- services/copier/service.go | 7 ++-- services/copier/service_test.go | 3 +- services/graphite/service.go | 8 +++-- services/httpd/handler.go | 12 +++---- services/httpd/service.go | 8 +++-- services/meta/client.go | 6 ++-- services/opentsdb/service.go | 7 ++-- services/precreator/service.go | 8 +++-- services/retention/service.go | 8 +++-- services/snapshotter/service.go | 8 +++-- services/subscriber/service.go | 8 +++-- services/udp/service.go | 8 +++-- tsdb/engine/tsm1/cache.go | 7 ++++ tsdb/engine/tsm1/engine.go | 18 ++++++++--- tsdb/engine/tsm1/file_store.go | 7 ++++ tsdb/engine/tsm1/wal.go | 6 ++++ tsdb/shard.go | 9 ++++++ tsdb/store.go | 16 ++++++++++ 30 files changed, 201 insertions(+), 90 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f178ed1db..5f918fb5a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Features +- [#6213](https://github.com/influxdata/influxdb/pull/6213): Make logging output location more programmatically configurable. - [#6237](https://github.com/influxdata/influxdb/issues/6237): Enable continuous integration testing on Windows platform via AppVeyor. Thanks @mvadu - [#6263](https://github.com/influxdata/influxdb/pull/6263): Reduce UDP Service allocation size. - [#6228](https://github.com/influxdata/influxdb/pull/6228): Support for multiple listeners for collectd and OpenTSDB inputs. diff --git a/cluster/points_writer.go b/cluster/points_writer.go index f7616a1674..1547b88321 100644 --- a/cluster/points_writer.go +++ b/cluster/points_writer.go @@ -3,6 +3,7 @@ package cluster import ( "errors" "expvar" + "io" "log" "os" "sync" @@ -140,6 +141,12 @@ func (w *PointsWriter) Close() error { return nil } +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (w *PointsWriter) SetLogOutput(lw io.Writer) { + w.Logger = log.New(lw, "[write] ", log.LstdFlags) +} + // MapShards maps the points contained in wp to a ShardMapping. If a point // maps to a shard group or shard that does not currently exist, it will be // created before returning the mapping. diff --git a/cluster/service.go b/cluster/service.go index af489e492e..06b453f51b 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -92,9 +92,10 @@ func (s *Service) Open() error { return nil } -// SetLogger sets the internal logger to the logger passed in. -func (s *Service) SetLogger(l *log.Logger) { - s.Logger = l +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (s *Service) SetLogOutput(w io.Writer) { + s.Logger = log.New(w, "[cluster] ", log.LstdFlags) } // serve accepts connections from the listener and handles them. diff --git a/cluster/statement_executor_test.go b/cluster/statement_executor_test.go index 53a7fb8f57..b806bac04d 100644 --- a/cluster/statement_executor_test.go +++ b/cluster/statement_executor_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "io" + "log" "os" "reflect" "testing" @@ -180,10 +181,11 @@ func NewQueryExecutor() *QueryExecutor { } e.QueryExecutor.StatementExecutor = e.StatementExecutor - e.QueryExecutor.LogOutput = &e.LogOutput + var out io.Writer = &e.LogOutput if testing.Verbose() { - e.QueryExecutor.LogOutput = io.MultiWriter(e.QueryExecutor.LogOutput, os.Stderr) + out = io.MultiWriter(out, os.Stderr) } + e.QueryExecutor.Logger = log.New(out, "[query] ", log.LstdFlags) return e } diff --git a/cmd/influxd/restore/restore.go b/cmd/influxd/restore/restore.go index 3fcbf476cf..35f071146a 100644 --- a/cmd/influxd/restore/restore.go +++ b/cmd/influxd/restore/restore.go @@ -9,7 +9,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "net" "os" "path/filepath" @@ -183,7 +182,7 @@ func (cmd *Command) unpackMeta() error { } client := meta.NewClient(c) - client.SetLogger(log.New(ioutil.Discard, "", 0)) + client.SetLogOutput(ioutil.Discard) if err := client.Open(); err != nil { return err } diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index f10a0ae1a5..6583783add 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -2,6 +2,7 @@ package run import ( "fmt" + "io" "log" "net" "os" @@ -52,6 +53,8 @@ type Server struct { BindAddress string Listener net.Listener + Logger *log.Logger + MetaClient *meta.Client TSDBStore *tsdb.Store @@ -85,6 +88,10 @@ type Server struct { tcpAddr string config *Config + + // logOutput is the writer to which all services should be configured to + // write logs to after appension. + logOutput io.Writer } // NewServer returns a new instance of Server built from a config. @@ -130,6 +137,8 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { BindAddress: bind, + Logger: log.New(os.Stderr, "", log.LstdFlags), + MetaClient: meta.NewClient(c.Meta), Monitor: monitor.New(c.Monitor), @@ -140,7 +149,8 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { httpUseTLS: c.HTTPD.HTTPSEnabled, tcpAddr: bind, - config: c, + config: c, + logOutput: os.Stderr, } if err := s.MetaClient.Open(); err != nil { @@ -176,7 +186,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { s.QueryExecutor.QueryTimeout = time.Duration(c.Cluster.QueryTimeout) s.QueryExecutor.MaxConcurrentQueries = c.Cluster.MaxConcurrentQueries if c.Data.QueryLogEnabled { - s.QueryExecutor.LogOutput = os.Stderr + s.QueryExecutor.Logger = log.New(os.Stderr, "[query] ", log.LstdFlags) } // Initialize the monitor @@ -185,7 +195,6 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { s.Monitor.Branch = s.buildInfo.Branch s.Monitor.BuildTime = s.buildInfo.Time s.Monitor.PointsWriter = (*monitorPointsWriter)(s.PointsWriter) - return s, nil } @@ -211,6 +220,13 @@ func (s *Server) appendCopierService() { s.CopierService = srv } +// SetLogOutput sets the logger used for all messages. It must not be called +// after the Open method has been called. +func (s *Server) SetLogOutput(w io.Writer) { + s.Logger = log.New(os.Stderr, "", log.LstdFlags) + s.logOutput = w +} + // Err returns an error channel that multiplexes all out of band errors received from all services. func (s *Server) Err() <-chan error { return s.err } @@ -265,6 +281,21 @@ func (s *Server) Open() error { s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader) s.CopierService.Listener = mux.Listen(copier.MuxHeader) + // Configure logging for all services and clients. + w := s.logOutput + s.MetaClient.SetLogOutput(w) + s.TSDBStore.SetLogOutput(w) + s.QueryExecutor.SetLogOutput(w) + s.PointsWriter.SetLogOutput(w) + s.Subscriber.SetLogOutput(w) + for _, svc := range s.Services { + svc.SetLogOutput(w) + } + s.ClusterService.SetLogOutput(w) + s.SnapshotterService.SetLogOutput(w) + s.CopierService.SetLogOutput(w) + s.Monitor.SetLogOutput(w) + // Open TSDB store. if err := s.TSDBStore.Open(); err != nil { return fmt.Errorf("open tsdb store: %s", err) @@ -360,7 +391,7 @@ func (s *Server) startServerReporting() { func (s *Server) reportServer() { dis, err := s.MetaClient.Databases() if err != nil { - log.Printf("failed to retrieve databases for reporting: %s", err.Error()) + s.Logger.Printf("failed to retrieve databases for reporting: %s", err.Error()) return } numDatabases := len(dis) @@ -384,7 +415,7 @@ func (s *Server) reportServer() { clusterID := s.MetaClient.ClusterID() if err != nil { - log.Printf("failed to retrieve cluster ID for reporting: %s", err.Error()) + s.Logger.Printf("failed to retrieve cluster ID for reporting: %s", err.Error()) return } @@ -407,7 +438,7 @@ func (s *Server) reportServer() { }, } - log.Printf("Sending anonymous usage statistics to m.influxdb.com") + s.Logger.Printf("Sending anonymous usage statistics to m.influxdb.com") go cl.Save(usage) } @@ -444,6 +475,7 @@ func (s *Server) MetaServers() []string { // Service represents a service attached to the server. type Service interface { + SetLogOutput(w io.Writer) Open() error Close() error } diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index b46176851e..2d14ba50be 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "math" "net/http" "net/url" @@ -480,19 +479,7 @@ func writeTestData(s *Server, t *Test) error { func configureLogging(s *Server) { // Set the logger to discard unless verbose is on if !testing.Verbose() { - type logSetter interface { - SetLogger(*log.Logger) - } - nullLogger := log.New(ioutil.Discard, "", 0) - s.TSDBStore.Logger = nullLogger - s.Monitor.SetLogger(nullLogger) - s.QueryExecutor.LogOutput = ioutil.Discard - s.Subscriber.SetLogger(nullLogger) - for _, service := range s.Services { - if service, ok := service.(logSetter); ok { - service.SetLogger(nullLogger) - } - } + s.SetLogOutput(ioutil.Discard) } } diff --git a/influxql/query_executor.go b/influxql/query_executor.go index 5336785d74..998bceac79 100644 --- a/influxql/query_executor.go +++ b/influxql/query_executor.go @@ -108,9 +108,9 @@ type QueryExecutor struct { // Maximum number of concurrent queries. MaxConcurrentQueries int - // Output of all logging. + // Logger to use for all logging. // Defaults to discarding all log output. - LogOutput io.Writer + Logger *log.Logger // Used for managing and tracking running queries. queries map[uint64]*QueryTask @@ -126,7 +126,7 @@ type QueryExecutor struct { func NewQueryExecutor() *QueryExecutor { return &QueryExecutor{ QueryTimeout: DefaultQueryTimeout, - LogOutput: ioutil.Discard, + Logger: log.New(ioutil.Discard, "[query] ", log.LstdFlags), queries: make(map[uint64]*QueryTask), nextID: 1, statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil), @@ -147,6 +147,12 @@ func (e *QueryExecutor) Close() error { return nil } +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (e *QueryExecutor) SetLogOutput(w io.Writer) { + e.Logger = log.New(w, "[query] ", log.LstdFlags) +} + // ExecuteQuery executes each statement within a query. func (e *QueryExecutor) ExecuteQuery(query *Query, database string, chunkSize int, closing chan struct{}) <-chan *Result { results := make(chan *Result) @@ -171,8 +177,6 @@ func (e *QueryExecutor) executeQuery(query *Query, database string, chunkSize in } defer e.killQuery(qid) - logger := e.logger() - // Setup the execution context that will be used when executing statements. ctx := ExecutionContext{ QueryID: qid, @@ -180,7 +184,7 @@ func (e *QueryExecutor) executeQuery(query *Query, database string, chunkSize in Results: results, Database: database, ChunkSize: chunkSize, - Log: logger, + Log: e.Logger, InterruptCh: task.closing, } @@ -214,7 +218,7 @@ loop: } // Log each normalized statement. - logger.Println(stmt.String()) + e.Logger.Println(stmt.String()) // Handle a query management queries specially so they don't go // to the underlying statement executor. @@ -314,10 +318,6 @@ func (e *QueryExecutor) executeShowQueriesStatement(q *ShowQueriesStatement) (mo }}, nil } -func (e *QueryExecutor) logger() *log.Logger { - return log.New(e.LogOutput, "[query] ", log.LstdFlags) -} - func (e *QueryExecutor) query(qid uint64) (*QueryTask, bool) { e.mu.RLock() query, ok := e.queries[qid] diff --git a/monitor/service.go b/monitor/service.go index 4fe730867e..8effde6f2e 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -3,6 +3,7 @@ package monitor // import "github.com/influxdata/influxdb/monitor" import ( "expvar" "fmt" + "io" "log" "os" "runtime" @@ -116,9 +117,10 @@ func (m *Monitor) Close() { m.done = nil } -// SetLogger sets the internal logger to the logger passed in. -func (m *Monitor) SetLogger(l *log.Logger) { - m.Logger = l +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (m *Monitor) SetLogOutput(w io.Writer) { + m.Logger = log.New(w, "[monitor] ", log.LstdFlags) } // RegisterDiagnosticsClient registers a diagnostics client with the given name and tags. diff --git a/services/admin/service.go b/services/admin/service.go index 0857eea834..b06b0cb2f5 100644 --- a/services/admin/service.go +++ b/services/admin/service.go @@ -3,6 +3,7 @@ package admin // import "github.com/influxdata/influxdb/services/admin" import ( "crypto/tls" "fmt" + "io" "log" "net" "net/http" @@ -81,9 +82,10 @@ func (s *Service) Close() error { return nil } -// SetLogger sets the internal logger to the logger passed in. -func (s *Service) SetLogger(l *log.Logger) { - s.logger = l +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (s *Service) SetLogOutput(w io.Writer) { + s.logger = log.New(w, "[admin] ", log.LstdFlags) } // Err returns a channel for fatal errors that occur on the listener. diff --git a/services/collectd/service.go b/services/collectd/service.go index 5cfd28d747..c5c9d3f1fa 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -3,6 +3,7 @@ package collectd // import "github.com/influxdata/influxdb/services/collectd" import ( "expvar" "fmt" + "io" "log" "net" "os" @@ -167,9 +168,10 @@ func (s *Service) Close() error { return nil } -// SetLogger sets the internal logger to the logger passed in. -func (s *Service) SetLogger(l *log.Logger) { - s.Logger = l +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (s *Service) SetLogOutput(w io.Writer) { + s.Logger = log.New(w, "[collectd] ", log.LstdFlags) } // SetTypes sets collectd types db. diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index e7490926bd..60858a4f52 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -4,6 +4,7 @@ import ( "errors" "expvar" "fmt" + "io" "log" "os" "strings" @@ -127,9 +128,10 @@ func (s *Service) Close() error { return nil } -// SetLogger sets the internal logger to the logger passed in. -func (s *Service) SetLogger(l *log.Logger) { - s.Logger = l +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (s *Service) SetLogOutput(w io.Writer) { + s.Logger = log.New(w, "[continuous_querier] ", log.LstdFlags) } // Run runs the specified continuous query, or all CQs if none is specified. diff --git a/services/copier/service.go b/services/copier/service.go index 216ac21b63..626f7f33a0 100644 --- a/services/copier/service.go +++ b/services/copier/service.go @@ -61,9 +61,10 @@ func (s *Service) Close() error { return nil } -// SetLogger sets the internal logger to the logger passed in. -func (s *Service) SetLogger(l *log.Logger) { - s.Logger = l +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (s *Service) SetLogOutput(w io.Writer) { + s.Logger = log.New(w, "[copier] ", log.LstdFlags) } // Err returns a channel for fatal out-of-band errors. diff --git a/services/copier/service_test.go b/services/copier/service_test.go index a68a413b87..5ee27c0db4 100644 --- a/services/copier/service_test.go +++ b/services/copier/service_test.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "io" "io/ioutil" - "log" "net" "os" "path/filepath" @@ -102,7 +101,7 @@ func NewService() *Service { s.Service.TSDBStore = &s.TSDBStore if !testing.Verbose() { - s.SetLogger(log.New(ioutil.Discard, "", 0)) + s.SetLogOutput(ioutil.Discard) } return s } diff --git a/services/graphite/service.go b/services/graphite/service.go index 4bc1131887..6628c74568 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -4,6 +4,7 @@ import ( "bufio" "expvar" "fmt" + "io" "log" "math" "net" @@ -200,9 +201,10 @@ func (s *Service) Close() error { return nil } -// SetLogger sets the internal logger to the logger passed in. -func (s *Service) SetLogger(l *log.Logger) { - s.logger = l +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (s *Service) SetLogOutput(w io.Writer) { + s.logger = log.New(w, "[graphite] ", log.LstdFlags) } // Addr returns the address the Service binds to. diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 856930dc93..4fb11aec7e 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -158,9 +158,9 @@ func (h *Handler) AddRoutes(routes ...Route) { handler = cors(handler) handler = requestID(handler) if h.loggingEnabled && r.LoggingEnabled { - handler = logging(handler, r.Name, h.Logger) + handler = h.logging(handler, r.Name) } - handler = recovery(handler, r.Name, h.Logger) // make sure recovery is always last + handler = h.recovery(handler, r.Name) // make sure recovery is always last h.mux.Add(r.Method, r.Pattern, handler) @@ -768,17 +768,17 @@ func requestID(inner http.Handler) http.Handler { }) } -func logging(inner http.Handler, name string, weblog *log.Logger) http.Handler { +func (h *Handler) logging(inner http.Handler, name string) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() l := &responseLogger{w: w} inner.ServeHTTP(l, r) logLine := buildLogLine(l, r, start) - weblog.Println(logLine) + h.Logger.Println(logLine) }) } -func recovery(inner http.Handler, name string, weblog *log.Logger) http.Handler { +func (h *Handler) recovery(inner http.Handler, name string) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() l := &responseLogger{w: w} @@ -787,7 +787,7 @@ func recovery(inner http.Handler, name string, weblog *log.Logger) http.Handler if err := recover(); err != nil { logLine := buildLogLine(l, r, start) logLine = fmt.Sprintf(`%s [panic:%s]`, logLine, err) - weblog.Println(logLine) + h.Logger.Println(logLine) } }() diff --git a/services/httpd/service.go b/services/httpd/service.go index 462957e985..177aa06a47 100644 --- a/services/httpd/service.go +++ b/services/httpd/service.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "expvar" "fmt" + "io" "log" "net" "net/http" @@ -130,9 +131,12 @@ func (s *Service) Close() error { return nil } -// SetLogger sets the internal logger to the logger passed in. -func (s *Service) SetLogger(l *log.Logger) { +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (s *Service) SetLogOutput(w io.Writer) { + l := log.New(w, "[httpd]", log.LstdFlags) s.Logger = l + s.Handler.Logger = l } // Err returns a channel for fatal errors that occur on the listener. diff --git a/services/meta/client.go b/services/meta/client.go index 52c84db4c7..1deec75af1 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -957,10 +957,12 @@ func (c *Client) MarshalBinary() ([]byte, error) { return c.cacheData.MarshalBinary() } -func (c *Client) SetLogger(l *log.Logger) { +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (c *Client) SetLogOutput(w io.Writer) { c.mu.Lock() defer c.mu.Unlock() - c.logger = l + c.logger = log.New(w, "[metaclient] ", log.LstdFlags) } func (c *Client) updateAuthCache() { diff --git a/services/opentsdb/service.go b/services/opentsdb/service.go index 32b205f1e1..9e887ab43e 100644 --- a/services/opentsdb/service.go +++ b/services/opentsdb/service.go @@ -177,8 +177,11 @@ func (s *Service) Close() error { return nil } -// SetLogger sets the internal logger to the logger passed in. -func (s *Service) SetLogger(l *log.Logger) { s.Logger = l } +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (s *Service) SetLogOutput(w io.Writer) { + s.Logger = log.New(w, "[opentsdb] ", log.LstdFlags) +} // Err returns a channel for fatal errors that occur on the listener. func (s *Service) Err() <-chan error { return s.err } diff --git a/services/precreator/service.go b/services/precreator/service.go index 3692abec33..9571d8ee7d 100644 --- a/services/precreator/service.go +++ b/services/precreator/service.go @@ -1,6 +1,7 @@ package precreator // import "github.com/influxdata/influxdb/services/precreator" import ( + "io" "log" "os" "sync" @@ -33,9 +34,10 @@ func NewService(c Config) (*Service, error) { return &s, nil } -// SetLogger sets the internal logger to the logger passed in. -func (s *Service) SetLogger(l *log.Logger) { - s.Logger = l +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (s *Service) SetLogOutput(w io.Writer) { + s.Logger = log.New(w, "[shard-precreation] ", log.LstdFlags) } // Open starts the precreation service. diff --git a/services/retention/service.go b/services/retention/service.go index 0730d65999..4103303663 100644 --- a/services/retention/service.go +++ b/services/retention/service.go @@ -1,6 +1,7 @@ package retention // import "github.com/influxdata/influxdb/services/retention" import ( + "io" "log" "os" "sync" @@ -54,9 +55,10 @@ func (s *Service) Close() error { return nil } -// SetLogger sets the internal logger to the logger passed in. -func (s *Service) SetLogger(l *log.Logger) { - s.logger = l +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (s *Service) SetLogOutput(w io.Writer) { + s.logger = log.New(w, "[retention] ", log.LstdFlags) } func (s *Service) deleteShardGroups() { diff --git a/services/snapshotter/service.go b/services/snapshotter/service.go index 3ba3593b96..445ca74184 100644 --- a/services/snapshotter/service.go +++ b/services/snapshotter/service.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "encoding/json" "fmt" + "io" "log" "net" "os" @@ -71,9 +72,10 @@ func (s *Service) Close() error { return nil } -// SetLogger sets the internal logger to the logger passed in. -func (s *Service) SetLogger(l *log.Logger) { - s.Logger = l +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (s *Service) SetLogOutput(w io.Writer) { + s.Logger = log.New(w, "[snapshot] ", log.LstdFlags) } // Err returns a channel for fatal out-of-band errors. diff --git a/services/subscriber/service.go b/services/subscriber/service.go index 5ffe528bdb..2a7693ef56 100644 --- a/services/subscriber/service.go +++ b/services/subscriber/service.go @@ -3,6 +3,7 @@ package subscriber // import "github.com/influxdata/influxdb/services/subscriber import ( "expvar" "fmt" + "io" "log" "net/url" "os" @@ -108,9 +109,10 @@ func (s *Service) Close() error { return nil } -// SetLogger sets the internal logger to the logger passed in. -func (s *Service) SetLogger(l *log.Logger) { - s.Logger = l +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (s *Service) SetLogOutput(w io.Writer) { + s.Logger = log.New(w, "[subscriber] ", log.LstdFlags) } func (s *Service) waitForMetaUpdates() { diff --git a/services/udp/service.go b/services/udp/service.go index fc07c01d0b..2e349b23e9 100644 --- a/services/udp/service.go +++ b/services/udp/service.go @@ -3,6 +3,7 @@ package udp // import "github.com/influxdata/influxdb/services/udp" import ( "errors" "expvar" + "io" "log" "net" "os" @@ -214,9 +215,10 @@ func (s *Service) Close() error { return nil } -// SetLogger sets the internal logger to the logger passed in. -func (s *Service) SetLogger(l *log.Logger) { - s.Logger = l +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (s *Service) SetLogOutput(w io.Writer) { + s.Logger = log.New(w, "[udp] ", log.LstdFlags) } // Addr returns the listener's address diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 5ce2b3fb64..1f1864dd32 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -3,6 +3,7 @@ package tsm1 import ( "expvar" "fmt" + "io" "log" "os" "sort" @@ -470,6 +471,12 @@ func (cl *CacheLoader) Load(cache *Cache) error { return nil } +// SetLogOutput sets the logger used for all messages. It must not be called +// after the Open method has been called. +func (cl *CacheLoader) SetLogOutput(w io.Writer) { + cl.Logger = log.New(w, "[cacheloader] ", log.LstdFlags) +} + // Updates the age statistic func (c *Cache) UpdateAge() { c.mu.RLock() diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 823dfb4da0..4b97c76b84 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -37,8 +37,9 @@ type Engine struct { done chan struct{} wg sync.WaitGroup - path string - logger *log.Logger + path string + logger *log.Logger + logOutput io.Writer // TODO(benbjohnson): Index needs to be moved entirely into engine. index *tsdb.DatabaseIndex @@ -79,7 +80,6 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine e := &Engine{ path: path, - logger: log.New(os.Stderr, "[tsm1] ", log.LstdFlags), measurementFields: make(map[string]*tsdb.MeasurementFields), WAL: w, @@ -96,6 +96,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize, CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration), } + e.SetLogOutput(os.Stderr) return e } @@ -194,8 +195,14 @@ func (e *Engine) Close() error { return e.WAL.Close() } -// SetLogOutput is a no-op. -func (e *Engine) SetLogOutput(w io.Writer) {} +// SetLogOutput sets the logger used for all messages. It must not be called +// after the Open method has been called. +func (e *Engine) SetLogOutput(w io.Writer) { + e.logger = log.New(w, "[tsm1] ", log.LstdFlags) + e.WAL.SetLogOutput(w) + e.FileStore.SetLogOutput(w) + e.logOutput = w +} // LoadMetadataIndex loads the shard metadata into memory. func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex) error { @@ -660,6 +667,7 @@ func (e *Engine) reloadCache() error { } loader := NewCacheLoader(files) + loader.SetLogOutput(e.logOutput) if err := loader.Load(e.Cache); err != nil { return err } diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index c27ff71290..6a5893f7e4 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -3,6 +3,7 @@ package tsm1 import ( "expvar" "fmt" + "io" "log" "os" "path/filepath" @@ -145,6 +146,12 @@ func NewFileStore(dir string) *FileStore { } } +// SetLogOutput sets the logger used for all messages. It must not be called +// after the Open method has been called. +func (f *FileStore) SetLogOutput(w io.Writer) { + f.Logger = log.New(w, "[filestore] ", log.LstdFlags) +} + // Returns the number of TSM files currently loaded func (f *FileStore) Count() int { f.mu.RLock() diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 675f14631d..9592305add 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -108,6 +108,12 @@ func NewWAL(path string) *WAL { } } +// SetLogOutput sets the location that logs are written to. It must not be +// called after the Open method has been called. +func (l *WAL) SetLogOutput(w io.Writer) { + l.logger = log.New(w, "[tsm1wal] ", log.LstdFlags) +} + // Path returns the path the log was initialized with. func (l *WAL) Path() string { l.mu.RLock() diff --git a/tsdb/shard.go b/tsdb/shard.go index 08633c4ba9..88002f34a7 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -121,6 +121,15 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti } } +// SetLogOutput sets the writer to which log output will be written. It must +// not be called after the Open method has been called. +func (s *Shard) SetLogOutput(w io.Writer) { + s.LogOutput = w + if !s.closed() { + s.engine.SetLogOutput(w) + } +} + // Path returns the path set on the shard when it was created. func (s *Shard) Path() string { return s.path } diff --git a/tsdb/store.go b/tsdb/store.go index dab21ba7ee..fe731291e3 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -42,6 +42,9 @@ type Store struct { EngineOptions EngineOptions Logger *log.Logger + // logOutput is where output from the underlying databases will go. + logOutput io.Writer + closing chan struct{} wg sync.WaitGroup opened bool @@ -57,6 +60,17 @@ func NewStore(path string) *Store { path: path, EngineOptions: opts, Logger: log.New(os.Stderr, "[store] ", log.LstdFlags), + logOutput: os.Stderr, + } +} + +// SetLogOutput sets the writer to which all logs are written. It must not be +// called after Open is called. +func (s *Store) SetLogOutput(w io.Writer) { + s.Logger = log.New(w, "[store]", log.LstdFlags) + s.logOutput = w + for _, s := range s.shards { + s.SetLogOutput(w) } } @@ -158,6 +172,7 @@ func (s *Store) loadShards() error { } shard := NewShard(shardID, s.databaseIndexes[db], path, walPath, s.EngineOptions) + shard.SetLogOutput(s.logOutput) err = shard.Open() if err != nil { @@ -283,6 +298,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)) shard := NewShard(shardID, db, path, walPath, s.EngineOptions) + shard.SetLogOutput(s.logOutput) if err := shard.Open(); err != nil { return err }