Make logging output location more programmatically configurable (#6213)
This has various benefits: - Users embedding InfluxDB within other Go programs can specify a different logger / prefix easily. - More consistent with code used elsewhere in InfluxDB (e.g. services, other `run.Server.*` fields, etc). - This is also more efficient, because it means `executeQuery` no longer allocates a single `*log.Logger` each time it is called.pull/6427/head
parent
e3275f22b7
commit
9dc09c5257
|
@ -2,6 +2,7 @@
|
|||
|
||||
### Features
|
||||
|
||||
- [#6213](https://github.com/influxdata/influxdb/pull/6213): Make logging output location more programmatically configurable.
|
||||
- [#6237](https://github.com/influxdata/influxdb/issues/6237): Enable continuous integration testing on Windows platform via AppVeyor. Thanks @mvadu
|
||||
- [#6263](https://github.com/influxdata/influxdb/pull/6263): Reduce UDP Service allocation size.
|
||||
- [#6228](https://github.com/influxdata/influxdb/pull/6228): Support for multiple listeners for collectd and OpenTSDB inputs.
|
||||
|
|
|
@ -3,6 +3,7 @@ package cluster
|
|||
import (
|
||||
"errors"
|
||||
"expvar"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
|
@ -140,6 +141,12 @@ func (w *PointsWriter) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (w *PointsWriter) SetLogOutput(lw io.Writer) {
|
||||
w.Logger = log.New(lw, "[write] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// MapShards maps the points contained in wp to a ShardMapping. If a point
|
||||
// maps to a shard group or shard that does not currently exist, it will be
|
||||
// created before returning the mapping.
|
||||
|
|
|
@ -92,9 +92,10 @@ func (s *Service) Open() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) {
|
||||
s.Logger = l
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Service) SetLogOutput(w io.Writer) {
|
||||
s.Logger = log.New(w, "[cluster] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// serve accepts connections from the listener and handles them.
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
@ -180,10 +181,11 @@ func NewQueryExecutor() *QueryExecutor {
|
|||
}
|
||||
e.QueryExecutor.StatementExecutor = e.StatementExecutor
|
||||
|
||||
e.QueryExecutor.LogOutput = &e.LogOutput
|
||||
var out io.Writer = &e.LogOutput
|
||||
if testing.Verbose() {
|
||||
e.QueryExecutor.LogOutput = io.MultiWriter(e.QueryExecutor.LogOutput, os.Stderr)
|
||||
out = io.MultiWriter(out, os.Stderr)
|
||||
}
|
||||
e.QueryExecutor.Logger = log.New(out, "[query] ", log.LstdFlags)
|
||||
|
||||
return e
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -183,7 +182,7 @@ func (cmd *Command) unpackMeta() error {
|
|||
}
|
||||
|
||||
client := meta.NewClient(c)
|
||||
client.SetLogger(log.New(ioutil.Discard, "", 0))
|
||||
client.SetLogOutput(ioutil.Discard)
|
||||
if err := client.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package run
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
|
@ -52,6 +53,8 @@ type Server struct {
|
|||
BindAddress string
|
||||
Listener net.Listener
|
||||
|
||||
Logger *log.Logger
|
||||
|
||||
MetaClient *meta.Client
|
||||
|
||||
TSDBStore *tsdb.Store
|
||||
|
@ -85,6 +88,10 @@ type Server struct {
|
|||
tcpAddr string
|
||||
|
||||
config *Config
|
||||
|
||||
// logOutput is the writer to which all services should be configured to
|
||||
// write logs to after appension.
|
||||
logOutput io.Writer
|
||||
}
|
||||
|
||||
// NewServer returns a new instance of Server built from a config.
|
||||
|
@ -130,6 +137,8 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
|||
|
||||
BindAddress: bind,
|
||||
|
||||
Logger: log.New(os.Stderr, "", log.LstdFlags),
|
||||
|
||||
MetaClient: meta.NewClient(c.Meta),
|
||||
|
||||
Monitor: monitor.New(c.Monitor),
|
||||
|
@ -140,7 +149,8 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
|||
httpUseTLS: c.HTTPD.HTTPSEnabled,
|
||||
tcpAddr: bind,
|
||||
|
||||
config: c,
|
||||
config: c,
|
||||
logOutput: os.Stderr,
|
||||
}
|
||||
|
||||
if err := s.MetaClient.Open(); err != nil {
|
||||
|
@ -176,7 +186,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
|||
s.QueryExecutor.QueryTimeout = time.Duration(c.Cluster.QueryTimeout)
|
||||
s.QueryExecutor.MaxConcurrentQueries = c.Cluster.MaxConcurrentQueries
|
||||
if c.Data.QueryLogEnabled {
|
||||
s.QueryExecutor.LogOutput = os.Stderr
|
||||
s.QueryExecutor.Logger = log.New(os.Stderr, "[query] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Initialize the monitor
|
||||
|
@ -185,7 +195,6 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
|||
s.Monitor.Branch = s.buildInfo.Branch
|
||||
s.Monitor.BuildTime = s.buildInfo.Time
|
||||
s.Monitor.PointsWriter = (*monitorPointsWriter)(s.PointsWriter)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
@ -211,6 +220,13 @@ func (s *Server) appendCopierService() {
|
|||
s.CopierService = srv
|
||||
}
|
||||
|
||||
// SetLogOutput sets the logger used for all messages. It must not be called
|
||||
// after the Open method has been called.
|
||||
func (s *Server) SetLogOutput(w io.Writer) {
|
||||
s.Logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
s.logOutput = w
|
||||
}
|
||||
|
||||
// Err returns an error channel that multiplexes all out of band errors received from all services.
|
||||
func (s *Server) Err() <-chan error { return s.err }
|
||||
|
||||
|
@ -265,6 +281,21 @@ func (s *Server) Open() error {
|
|||
s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
|
||||
s.CopierService.Listener = mux.Listen(copier.MuxHeader)
|
||||
|
||||
// Configure logging for all services and clients.
|
||||
w := s.logOutput
|
||||
s.MetaClient.SetLogOutput(w)
|
||||
s.TSDBStore.SetLogOutput(w)
|
||||
s.QueryExecutor.SetLogOutput(w)
|
||||
s.PointsWriter.SetLogOutput(w)
|
||||
s.Subscriber.SetLogOutput(w)
|
||||
for _, svc := range s.Services {
|
||||
svc.SetLogOutput(w)
|
||||
}
|
||||
s.ClusterService.SetLogOutput(w)
|
||||
s.SnapshotterService.SetLogOutput(w)
|
||||
s.CopierService.SetLogOutput(w)
|
||||
s.Monitor.SetLogOutput(w)
|
||||
|
||||
// Open TSDB store.
|
||||
if err := s.TSDBStore.Open(); err != nil {
|
||||
return fmt.Errorf("open tsdb store: %s", err)
|
||||
|
@ -360,7 +391,7 @@ func (s *Server) startServerReporting() {
|
|||
func (s *Server) reportServer() {
|
||||
dis, err := s.MetaClient.Databases()
|
||||
if err != nil {
|
||||
log.Printf("failed to retrieve databases for reporting: %s", err.Error())
|
||||
s.Logger.Printf("failed to retrieve databases for reporting: %s", err.Error())
|
||||
return
|
||||
}
|
||||
numDatabases := len(dis)
|
||||
|
@ -384,7 +415,7 @@ func (s *Server) reportServer() {
|
|||
|
||||
clusterID := s.MetaClient.ClusterID()
|
||||
if err != nil {
|
||||
log.Printf("failed to retrieve cluster ID for reporting: %s", err.Error())
|
||||
s.Logger.Printf("failed to retrieve cluster ID for reporting: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -407,7 +438,7 @@ func (s *Server) reportServer() {
|
|||
},
|
||||
}
|
||||
|
||||
log.Printf("Sending anonymous usage statistics to m.influxdb.com")
|
||||
s.Logger.Printf("Sending anonymous usage statistics to m.influxdb.com")
|
||||
|
||||
go cl.Save(usage)
|
||||
}
|
||||
|
@ -444,6 +475,7 @@ func (s *Server) MetaServers() []string {
|
|||
|
||||
// Service represents a service attached to the server.
|
||||
type Service interface {
|
||||
SetLogOutput(w io.Writer)
|
||||
Open() error
|
||||
Close() error
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
@ -480,19 +479,7 @@ func writeTestData(s *Server, t *Test) error {
|
|||
func configureLogging(s *Server) {
|
||||
// Set the logger to discard unless verbose is on
|
||||
if !testing.Verbose() {
|
||||
type logSetter interface {
|
||||
SetLogger(*log.Logger)
|
||||
}
|
||||
nullLogger := log.New(ioutil.Discard, "", 0)
|
||||
s.TSDBStore.Logger = nullLogger
|
||||
s.Monitor.SetLogger(nullLogger)
|
||||
s.QueryExecutor.LogOutput = ioutil.Discard
|
||||
s.Subscriber.SetLogger(nullLogger)
|
||||
for _, service := range s.Services {
|
||||
if service, ok := service.(logSetter); ok {
|
||||
service.SetLogger(nullLogger)
|
||||
}
|
||||
}
|
||||
s.SetLogOutput(ioutil.Discard)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -108,9 +108,9 @@ type QueryExecutor struct {
|
|||
// Maximum number of concurrent queries.
|
||||
MaxConcurrentQueries int
|
||||
|
||||
// Output of all logging.
|
||||
// Logger to use for all logging.
|
||||
// Defaults to discarding all log output.
|
||||
LogOutput io.Writer
|
||||
Logger *log.Logger
|
||||
|
||||
// Used for managing and tracking running queries.
|
||||
queries map[uint64]*QueryTask
|
||||
|
@ -126,7 +126,7 @@ type QueryExecutor struct {
|
|||
func NewQueryExecutor() *QueryExecutor {
|
||||
return &QueryExecutor{
|
||||
QueryTimeout: DefaultQueryTimeout,
|
||||
LogOutput: ioutil.Discard,
|
||||
Logger: log.New(ioutil.Discard, "[query] ", log.LstdFlags),
|
||||
queries: make(map[uint64]*QueryTask),
|
||||
nextID: 1,
|
||||
statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil),
|
||||
|
@ -147,6 +147,12 @@ func (e *QueryExecutor) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (e *QueryExecutor) SetLogOutput(w io.Writer) {
|
||||
e.Logger = log.New(w, "[query] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// ExecuteQuery executes each statement within a query.
|
||||
func (e *QueryExecutor) ExecuteQuery(query *Query, database string, chunkSize int, closing chan struct{}) <-chan *Result {
|
||||
results := make(chan *Result)
|
||||
|
@ -171,8 +177,6 @@ func (e *QueryExecutor) executeQuery(query *Query, database string, chunkSize in
|
|||
}
|
||||
defer e.killQuery(qid)
|
||||
|
||||
logger := e.logger()
|
||||
|
||||
// Setup the execution context that will be used when executing statements.
|
||||
ctx := ExecutionContext{
|
||||
QueryID: qid,
|
||||
|
@ -180,7 +184,7 @@ func (e *QueryExecutor) executeQuery(query *Query, database string, chunkSize in
|
|||
Results: results,
|
||||
Database: database,
|
||||
ChunkSize: chunkSize,
|
||||
Log: logger,
|
||||
Log: e.Logger,
|
||||
InterruptCh: task.closing,
|
||||
}
|
||||
|
||||
|
@ -214,7 +218,7 @@ loop:
|
|||
}
|
||||
|
||||
// Log each normalized statement.
|
||||
logger.Println(stmt.String())
|
||||
e.Logger.Println(stmt.String())
|
||||
|
||||
// Handle a query management queries specially so they don't go
|
||||
// to the underlying statement executor.
|
||||
|
@ -314,10 +318,6 @@ func (e *QueryExecutor) executeShowQueriesStatement(q *ShowQueriesStatement) (mo
|
|||
}}, nil
|
||||
}
|
||||
|
||||
func (e *QueryExecutor) logger() *log.Logger {
|
||||
return log.New(e.LogOutput, "[query] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
func (e *QueryExecutor) query(qid uint64) (*QueryTask, bool) {
|
||||
e.mu.RLock()
|
||||
query, ok := e.queries[qid]
|
||||
|
|
|
@ -3,6 +3,7 @@ package monitor // import "github.com/influxdata/influxdb/monitor"
|
|||
import (
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"runtime"
|
||||
|
@ -116,9 +117,10 @@ func (m *Monitor) Close() {
|
|||
m.done = nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (m *Monitor) SetLogger(l *log.Logger) {
|
||||
m.Logger = l
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (m *Monitor) SetLogOutput(w io.Writer) {
|
||||
m.Logger = log.New(w, "[monitor] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// RegisterDiagnosticsClient registers a diagnostics client with the given name and tags.
|
||||
|
|
|
@ -3,6 +3,7 @@ package admin // import "github.com/influxdata/influxdb/services/admin"
|
|||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
|
@ -81,9 +82,10 @@ func (s *Service) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) {
|
||||
s.logger = l
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Service) SetLogOutput(w io.Writer) {
|
||||
s.logger = log.New(w, "[admin] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Err returns a channel for fatal errors that occur on the listener.
|
||||
|
|
|
@ -3,6 +3,7 @@ package collectd // import "github.com/influxdata/influxdb/services/collectd"
|
|||
import (
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
|
@ -167,9 +168,10 @@ func (s *Service) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) {
|
||||
s.Logger = l
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Service) SetLogOutput(w io.Writer) {
|
||||
s.Logger = log.New(w, "[collectd] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// SetTypes sets collectd types db.
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
|
@ -127,9 +128,10 @@ func (s *Service) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) {
|
||||
s.Logger = l
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Service) SetLogOutput(w io.Writer) {
|
||||
s.Logger = log.New(w, "[continuous_querier] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Run runs the specified continuous query, or all CQs if none is specified.
|
||||
|
|
|
@ -61,9 +61,10 @@ func (s *Service) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) {
|
||||
s.Logger = l
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Service) SetLogOutput(w io.Writer) {
|
||||
s.Logger = log.New(w, "[copier] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Err returns a channel for fatal out-of-band errors.
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"encoding/binary"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -102,7 +101,7 @@ func NewService() *Service {
|
|||
s.Service.TSDBStore = &s.TSDBStore
|
||||
|
||||
if !testing.Verbose() {
|
||||
s.SetLogger(log.New(ioutil.Discard, "", 0))
|
||||
s.SetLogOutput(ioutil.Discard)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bufio"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math"
|
||||
"net"
|
||||
|
@ -200,9 +201,10 @@ func (s *Service) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) {
|
||||
s.logger = l
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Service) SetLogOutput(w io.Writer) {
|
||||
s.logger = log.New(w, "[graphite] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Addr returns the address the Service binds to.
|
||||
|
|
|
@ -158,9 +158,9 @@ func (h *Handler) AddRoutes(routes ...Route) {
|
|||
handler = cors(handler)
|
||||
handler = requestID(handler)
|
||||
if h.loggingEnabled && r.LoggingEnabled {
|
||||
handler = logging(handler, r.Name, h.Logger)
|
||||
handler = h.logging(handler, r.Name)
|
||||
}
|
||||
handler = recovery(handler, r.Name, h.Logger) // make sure recovery is always last
|
||||
handler = h.recovery(handler, r.Name) // make sure recovery is always last
|
||||
|
||||
h.mux.Add(r.Method, r.Pattern, handler)
|
||||
|
||||
|
@ -768,17 +768,17 @@ func requestID(inner http.Handler) http.Handler {
|
|||
})
|
||||
}
|
||||
|
||||
func logging(inner http.Handler, name string, weblog *log.Logger) http.Handler {
|
||||
func (h *Handler) logging(inner http.Handler, name string) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
l := &responseLogger{w: w}
|
||||
inner.ServeHTTP(l, r)
|
||||
logLine := buildLogLine(l, r, start)
|
||||
weblog.Println(logLine)
|
||||
h.Logger.Println(logLine)
|
||||
})
|
||||
}
|
||||
|
||||
func recovery(inner http.Handler, name string, weblog *log.Logger) http.Handler {
|
||||
func (h *Handler) recovery(inner http.Handler, name string) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
l := &responseLogger{w: w}
|
||||
|
@ -787,7 +787,7 @@ func recovery(inner http.Handler, name string, weblog *log.Logger) http.Handler
|
|||
if err := recover(); err != nil {
|
||||
logLine := buildLogLine(l, r, start)
|
||||
logLine = fmt.Sprintf(`%s [panic:%s]`, logLine, err)
|
||||
weblog.Println(logLine)
|
||||
h.Logger.Println(logLine)
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"crypto/tls"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
|
@ -130,9 +131,12 @@ func (s *Service) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) {
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Service) SetLogOutput(w io.Writer) {
|
||||
l := log.New(w, "[httpd]", log.LstdFlags)
|
||||
s.Logger = l
|
||||
s.Handler.Logger = l
|
||||
}
|
||||
|
||||
// Err returns a channel for fatal errors that occur on the listener.
|
||||
|
|
|
@ -957,10 +957,12 @@ func (c *Client) MarshalBinary() ([]byte, error) {
|
|||
return c.cacheData.MarshalBinary()
|
||||
}
|
||||
|
||||
func (c *Client) SetLogger(l *log.Logger) {
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (c *Client) SetLogOutput(w io.Writer) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.logger = l
|
||||
c.logger = log.New(w, "[metaclient] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
func (c *Client) updateAuthCache() {
|
||||
|
|
|
@ -177,8 +177,11 @@ func (s *Service) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) { s.Logger = l }
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Service) SetLogOutput(w io.Writer) {
|
||||
s.Logger = log.New(w, "[opentsdb] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Err returns a channel for fatal errors that occur on the listener.
|
||||
func (s *Service) Err() <-chan error { return s.err }
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package precreator // import "github.com/influxdata/influxdb/services/precreator"
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
|
@ -33,9 +34,10 @@ func NewService(c Config) (*Service, error) {
|
|||
return &s, nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) {
|
||||
s.Logger = l
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Service) SetLogOutput(w io.Writer) {
|
||||
s.Logger = log.New(w, "[shard-precreation] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Open starts the precreation service.
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package retention // import "github.com/influxdata/influxdb/services/retention"
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
|
@ -54,9 +55,10 @@ func (s *Service) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) {
|
||||
s.logger = l
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Service) SetLogOutput(w io.Writer) {
|
||||
s.logger = log.New(w, "[retention] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
func (s *Service) deleteShardGroups() {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
|
@ -71,9 +72,10 @@ func (s *Service) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) {
|
||||
s.Logger = l
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Service) SetLogOutput(w io.Writer) {
|
||||
s.Logger = log.New(w, "[snapshot] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Err returns a channel for fatal out-of-band errors.
|
||||
|
|
|
@ -3,6 +3,7 @@ package subscriber // import "github.com/influxdata/influxdb/services/subscriber
|
|||
import (
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/url"
|
||||
"os"
|
||||
|
@ -108,9 +109,10 @@ func (s *Service) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) {
|
||||
s.Logger = l
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Service) SetLogOutput(w io.Writer) {
|
||||
s.Logger = log.New(w, "[subscriber] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
func (s *Service) waitForMetaUpdates() {
|
||||
|
|
|
@ -3,6 +3,7 @@ package udp // import "github.com/influxdata/influxdb/services/udp"
|
|||
import (
|
||||
"errors"
|
||||
"expvar"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
|
@ -214,9 +215,10 @@ func (s *Service) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) {
|
||||
s.Logger = l
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Service) SetLogOutput(w io.Writer) {
|
||||
s.Logger = log.New(w, "[udp] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Addr returns the listener's address
|
||||
|
|
|
@ -3,6 +3,7 @@ package tsm1
|
|||
import (
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"sort"
|
||||
|
@ -470,6 +471,12 @@ func (cl *CacheLoader) Load(cache *Cache) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SetLogOutput sets the logger used for all messages. It must not be called
|
||||
// after the Open method has been called.
|
||||
func (cl *CacheLoader) SetLogOutput(w io.Writer) {
|
||||
cl.Logger = log.New(w, "[cacheloader] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Updates the age statistic
|
||||
func (c *Cache) UpdateAge() {
|
||||
c.mu.RLock()
|
||||
|
|
|
@ -37,8 +37,9 @@ type Engine struct {
|
|||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
path string
|
||||
logger *log.Logger
|
||||
path string
|
||||
logger *log.Logger
|
||||
logOutput io.Writer
|
||||
|
||||
// TODO(benbjohnson): Index needs to be moved entirely into engine.
|
||||
index *tsdb.DatabaseIndex
|
||||
|
@ -79,7 +80,6 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
|
|||
|
||||
e := &Engine{
|
||||
path: path,
|
||||
logger: log.New(os.Stderr, "[tsm1] ", log.LstdFlags),
|
||||
measurementFields: make(map[string]*tsdb.MeasurementFields),
|
||||
|
||||
WAL: w,
|
||||
|
@ -96,6 +96,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
|
|||
CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize,
|
||||
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
|
||||
}
|
||||
e.SetLogOutput(os.Stderr)
|
||||
|
||||
return e
|
||||
}
|
||||
|
@ -194,8 +195,14 @@ func (e *Engine) Close() error {
|
|||
return e.WAL.Close()
|
||||
}
|
||||
|
||||
// SetLogOutput is a no-op.
|
||||
func (e *Engine) SetLogOutput(w io.Writer) {}
|
||||
// SetLogOutput sets the logger used for all messages. It must not be called
|
||||
// after the Open method has been called.
|
||||
func (e *Engine) SetLogOutput(w io.Writer) {
|
||||
e.logger = log.New(w, "[tsm1] ", log.LstdFlags)
|
||||
e.WAL.SetLogOutput(w)
|
||||
e.FileStore.SetLogOutput(w)
|
||||
e.logOutput = w
|
||||
}
|
||||
|
||||
// LoadMetadataIndex loads the shard metadata into memory.
|
||||
func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex) error {
|
||||
|
@ -660,6 +667,7 @@ func (e *Engine) reloadCache() error {
|
|||
}
|
||||
|
||||
loader := NewCacheLoader(files)
|
||||
loader.SetLogOutput(e.logOutput)
|
||||
if err := loader.Load(e.Cache); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package tsm1
|
|||
import (
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -145,6 +146,12 @@ func NewFileStore(dir string) *FileStore {
|
|||
}
|
||||
}
|
||||
|
||||
// SetLogOutput sets the logger used for all messages. It must not be called
|
||||
// after the Open method has been called.
|
||||
func (f *FileStore) SetLogOutput(w io.Writer) {
|
||||
f.Logger = log.New(w, "[filestore] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Returns the number of TSM files currently loaded
|
||||
func (f *FileStore) Count() int {
|
||||
f.mu.RLock()
|
||||
|
|
|
@ -108,6 +108,12 @@ func NewWAL(path string) *WAL {
|
|||
}
|
||||
}
|
||||
|
||||
// SetLogOutput sets the location that logs are written to. It must not be
|
||||
// called after the Open method has been called.
|
||||
func (l *WAL) SetLogOutput(w io.Writer) {
|
||||
l.logger = log.New(w, "[tsm1wal] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Path returns the path the log was initialized with.
|
||||
func (l *WAL) Path() string {
|
||||
l.mu.RLock()
|
||||
|
|
|
@ -121,6 +121,15 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
|
|||
}
|
||||
}
|
||||
|
||||
// SetLogOutput sets the writer to which log output will be written. It must
|
||||
// not be called after the Open method has been called.
|
||||
func (s *Shard) SetLogOutput(w io.Writer) {
|
||||
s.LogOutput = w
|
||||
if !s.closed() {
|
||||
s.engine.SetLogOutput(w)
|
||||
}
|
||||
}
|
||||
|
||||
// Path returns the path set on the shard when it was created.
|
||||
func (s *Shard) Path() string { return s.path }
|
||||
|
||||
|
|
|
@ -42,6 +42,9 @@ type Store struct {
|
|||
EngineOptions EngineOptions
|
||||
Logger *log.Logger
|
||||
|
||||
// logOutput is where output from the underlying databases will go.
|
||||
logOutput io.Writer
|
||||
|
||||
closing chan struct{}
|
||||
wg sync.WaitGroup
|
||||
opened bool
|
||||
|
@ -57,6 +60,17 @@ func NewStore(path string) *Store {
|
|||
path: path,
|
||||
EngineOptions: opts,
|
||||
Logger: log.New(os.Stderr, "[store] ", log.LstdFlags),
|
||||
logOutput: os.Stderr,
|
||||
}
|
||||
}
|
||||
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Store) SetLogOutput(w io.Writer) {
|
||||
s.Logger = log.New(w, "[store]", log.LstdFlags)
|
||||
s.logOutput = w
|
||||
for _, s := range s.shards {
|
||||
s.SetLogOutput(w)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -158,6 +172,7 @@ func (s *Store) loadShards() error {
|
|||
}
|
||||
|
||||
shard := NewShard(shardID, s.databaseIndexes[db], path, walPath, s.EngineOptions)
|
||||
shard.SetLogOutput(s.logOutput)
|
||||
|
||||
err = shard.Open()
|
||||
if err != nil {
|
||||
|
@ -283,6 +298,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
|
|||
|
||||
path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
|
||||
shard := NewShard(shardID, db, path, walPath, s.EngineOptions)
|
||||
shard.SetLogOutput(s.logOutput)
|
||||
if err := shard.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue