From 60c2852d31b37730763d20f12af37a88eb427401 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 5 Mar 2015 16:20:57 -0800 Subject: [PATCH 1/3] Add support for enabling write-tracing --- cmd/influxd/config.go | 3 ++- cmd/influxd/config_test.go | 1 + etc/config.sample.toml | 1 + 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 8254788646..7485dda029 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -96,7 +96,8 @@ type Config struct { } `toml:"cluster"` Logging struct { - File string `toml:"file"` + File string `toml:"file"` + WriteTraceEnabled bool `toml:"write-tracing"` } `toml:"logging"` ContinuousQuery struct { diff --git a/cmd/influxd/config_test.go b/cmd/influxd/config_test.go index 10a9bed00a..4e6b4ac88e 100644 --- a/cmd/influxd/config_test.go +++ b/cmd/influxd/config_test.go @@ -174,6 +174,7 @@ enabled = true [logging] file = "influxdb.log" +write-tracing = true # Configure the admin server [admin] diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 8448404bc3..28444afdc3 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -83,3 +83,4 @@ dir = "/tmp/influxdb/development/state" [logging] file = "/var/log/influxdb/influxd.log" # Leave blank to redirect logs to stderr. +write-tracing = false # If true, enables detailed logging of the write system. From 76c255a3c8ef360b9b5fadbbe2fc6499c9aa5798 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 5 Mar 2015 16:27:01 -0800 Subject: [PATCH 2/3] Add trace-level logging to server write path --- cmd/influxd/run.go | 1 + server.go | 32 +++++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 447008d3ca..5792565e11 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -262,6 +262,7 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b // Create and open the server. s := influxdb.NewServer() s.SetLogOutput(w) + s.WriteTrace = config.Logging.WriteTraceEnabled s.RecomputePreviousN = config.ContinuousQuery.RecomputePreviousN s.RecomputeNoOlderThan = time.Duration(config.ContinuousQuery.RecomputeNoOlderThan) s.ComputeRunsPerInterval = config.ContinuousQuery.ComputeRunsPerInterval diff --git a/server.go b/server.go index 4abf9c6728..66c49cdf8e 100644 --- a/server.go +++ b/server.go @@ -65,7 +65,8 @@ type Server struct { shards map[uint64]*Shard // shards by shard id - Logger *log.Logger + Logger *log.Logger + WriteTrace bool // Detailed logging of write path authenticationEnabled bool @@ -1369,6 +1370,11 @@ type Point struct { // WriteSeries writes series data to the database. // Returns the messaging index the data was written to. func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) { + if s.WriteTrace { + log.Printf("received write for database '%s', retention policy '%s', with %d points", + database, retentionPolicy, len(points)) + } + // Make sure every point has at least one field. for _, p := range points { if len(p.Fields) == 0 { @@ -1391,11 +1397,17 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( if err := s.createMeasurementsIfNotExists(database, retentionPolicy, points); err != nil { return 0, err } + if s.WriteTrace { + log.Printf("measurements and series created on database '%s'", database) + } // Ensure all the required shard groups exist. TODO: this should be done async. if err := s.createShardGroupsIfNotExists(database, retentionPolicy, points); err != nil { return 0, err } + if s.WriteTrace { + log.Printf("shard groups created for database '%s'", database) + } // Build writeRawSeriesMessageType publish commands. shardData := make(map[uint64][]byte, 0) @@ -1420,9 +1432,15 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( if err != nil { return err } + if s.WriteTrace { + log.Printf("shard group located: %v", g) + } // Find appropriate shard within the shard group. sh := g.ShardBySeriesID(series.ID) + if s.WriteTrace { + log.Printf("shard located: %v", sh) + } // Many points are likely to have the same Measurement name. Re-use codecs if possible. var codec *FieldCodec @@ -1445,6 +1463,9 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( shardData[sh.ID] = make([]byte, 0) } shardData[sh.ID] = append(shardData[sh.ID], data...) + if s.WriteTrace { + log.Printf("data appended to buffer for shard %d", sh.ID) + } } return nil @@ -1467,6 +1488,9 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( if index > maxIndex { maxIndex = index } + if s.WriteTrace { + log.Printf("write series message published successfully for topic %d", i) + } } return maxIndex, err @@ -1481,10 +1505,16 @@ func (s *Server) applyWriteRawSeries(m *messaging.Message) error { if sh == nil { return ErrShardNotFound } + if s.WriteTrace { + log.Printf("received write message for application, shard %d", sh.ID) + } if err := sh.writeSeries(m.Data); err != nil { return err } + if s.WriteTrace { + log.Printf("write message successfully applied to shard %d", sh.ID) + } return nil } From 2402467e63a416d6fa19062d93f86844d24b7a76 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 5 Mar 2015 17:01:02 -0800 Subject: [PATCH 3/3] Add trace-level logging of write to handler --- CHANGELOG.md | 1 + cmd/influxd/run.go | 3 +++ httpd/handler.go | 31 +++++++++++++++++++++++++------ 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d8645f3944..b48aec90ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ ### Features - [#1755](https://github.com/influxdb/influxdb/pull/1848): Support JSON data ingest over UDP - [#1857](https://github.com/influxdb/influxdb/pull/1857): Support retention policies with infinite duration +- [#1858](https://github.com/influxdb/influxdb/pull/1858): Enable detailed tracing of write path ## v0.9.0-rc7 [2015-03-02] diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 5792565e11..712177c20d 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -80,6 +80,9 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B // Start the server handler. Attach to broker if listening on the same port. if s != nil { sh := httpd.NewHandler(s, config.Authentication.Enabled, version) + sh.SetLogOutput(logWriter) + sh.WriteTrace = config.Logging.WriteTraceEnabled + if h != nil && config.BrokerAddr() == config.DataAddr() { h.serverHandler = sh } else { diff --git a/httpd/handler.go b/httpd/handler.go index 94749b7a18..2cb601e2bb 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "log" "math" "net/http" @@ -43,6 +44,9 @@ type Handler struct { routes []route mux *pat.PatternServeMux requireAuthentication bool + + Logger *log.Logger + WriteTrace bool // Detailed logging of write path } // NewHandler returns a new instance of Handler. @@ -51,10 +55,9 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string) server: s, mux: pat.New(), requireAuthentication: requireAuthentication, + Logger: log.New(os.Stderr, "[http] ", log.LstdFlags), } - weblog := log.New(os.Stderr, `[http] `, 0) - h.routes = append(h.routes, route{ "query", // Query serving route. @@ -129,9 +132,9 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string) handler = cors(handler) handler = requestID(handler) if r.log { - handler = logging(handler, r.name, weblog) + handler = logging(handler, r.name, h.Logger) } - handler = recovery(handler, r.name, weblog) // make sure recovery is always last + handler = recovery(handler, r.name, h.Logger) // make sure recovery is always last h.mux.Add(r.method, r.pattern, handler) } @@ -139,7 +142,12 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string) return h } -//ServeHTTP responds to HTTP request to the handler. +// SetLogOutput sets writer for all handler log output. +func (h *Handler) SetLogOutput(w io.Writer) { + h.Logger = log.New(w, "[http] ", log.LstdFlags) +} + +// ServeHTTP responds to HTTP request to the handler. func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.mux.ServeHTTP(w, r) } @@ -168,8 +176,19 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ // serveWrite receives incoming series data and writes it to the database. func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) { var bp influxdb.BatchPoints + var dec *json.Decoder - dec := json.NewDecoder(r.Body) + if h.WriteTrace { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + h.Logger.Print("write handler failed to read bytes from request body") + } else { + h.Logger.Printf("write body received by handler: %s", string(b)) + } + dec = json.NewDecoder(strings.NewReader(string(b))) + } else { + dec = json.NewDecoder(r.Body) + } var writeError = func(result influxdb.Result, statusCode int) { w.WriteHeader(statusCode)