Merge pull request #1858 from influxdb/server_stats

Basic write-path logging
pull/1860/head v0.9.0-rc8
Philip O'Toole 2015-03-05 17:40:51 -08:00
commit e997fda05e
7 changed files with 65 additions and 8 deletions

View File

@ -8,6 +8,7 @@
### Features ### Features
- [#1755](https://github.com/influxdb/influxdb/pull/1848): Support JSON data ingest over UDP - [#1755](https://github.com/influxdb/influxdb/pull/1848): Support JSON data ingest over UDP
- [#1857](https://github.com/influxdb/influxdb/pull/1857): Support retention policies with infinite duration - [#1857](https://github.com/influxdb/influxdb/pull/1857): Support retention policies with infinite duration
- [#1858](https://github.com/influxdb/influxdb/pull/1858): Enable detailed tracing of write path
## v0.9.0-rc7 [2015-03-02] ## v0.9.0-rc7 [2015-03-02]

View File

@ -96,7 +96,8 @@ type Config struct {
} `toml:"cluster"` } `toml:"cluster"`
Logging struct { Logging struct {
File string `toml:"file"` File string `toml:"file"`
WriteTraceEnabled bool `toml:"write-tracing"`
} `toml:"logging"` } `toml:"logging"`
ContinuousQuery struct { ContinuousQuery struct {

View File

@ -174,6 +174,7 @@ enabled = true
[logging] [logging]
file = "influxdb.log" file = "influxdb.log"
write-tracing = true
# Configure the admin server # Configure the admin server
[admin] [admin]

View File

@ -80,6 +80,9 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
// Start the server handler. Attach to broker if listening on the same port. // Start the server handler. Attach to broker if listening on the same port.
if s != nil { if s != nil {
sh := httpd.NewHandler(s, config.Authentication.Enabled, version) sh := httpd.NewHandler(s, config.Authentication.Enabled, version)
sh.SetLogOutput(logWriter)
sh.WriteTrace = config.Logging.WriteTraceEnabled
if h != nil && config.BrokerAddr() == config.DataAddr() { if h != nil && config.BrokerAddr() == config.DataAddr() {
h.serverHandler = sh h.serverHandler = sh
} else { } else {
@ -262,6 +265,7 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b
// Create and open the server. // Create and open the server.
s := influxdb.NewServer() s := influxdb.NewServer()
s.SetLogOutput(w) s.SetLogOutput(w)
s.WriteTrace = config.Logging.WriteTraceEnabled
s.RecomputePreviousN = config.ContinuousQuery.RecomputePreviousN s.RecomputePreviousN = config.ContinuousQuery.RecomputePreviousN
s.RecomputeNoOlderThan = time.Duration(config.ContinuousQuery.RecomputeNoOlderThan) s.RecomputeNoOlderThan = time.Duration(config.ContinuousQuery.RecomputeNoOlderThan)
s.ComputeRunsPerInterval = config.ContinuousQuery.ComputeRunsPerInterval s.ComputeRunsPerInterval = config.ContinuousQuery.ComputeRunsPerInterval

View File

@ -83,3 +83,4 @@ dir = "/tmp/influxdb/development/state"
[logging] [logging]
file = "/var/log/influxdb/influxd.log" # Leave blank to redirect logs to stderr. file = "/var/log/influxdb/influxd.log" # Leave blank to redirect logs to stderr.
write-tracing = false # If true, enables detailed logging of the write system.

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"log" "log"
"math" "math"
"net/http" "net/http"
@ -43,6 +44,9 @@ type Handler struct {
routes []route routes []route
mux *pat.PatternServeMux mux *pat.PatternServeMux
requireAuthentication bool requireAuthentication bool
Logger *log.Logger
WriteTrace bool // Detailed logging of write path
} }
// NewHandler returns a new instance of Handler. // NewHandler returns a new instance of Handler.
@ -51,10 +55,9 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string)
server: s, server: s,
mux: pat.New(), mux: pat.New(),
requireAuthentication: requireAuthentication, requireAuthentication: requireAuthentication,
Logger: log.New(os.Stderr, "[http] ", log.LstdFlags),
} }
weblog := log.New(os.Stderr, `[http] `, 0)
h.routes = append(h.routes, h.routes = append(h.routes,
route{ route{
"query", // Query serving route. "query", // Query serving route.
@ -129,9 +132,9 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string)
handler = cors(handler) handler = cors(handler)
handler = requestID(handler) handler = requestID(handler)
if r.log { if r.log {
handler = logging(handler, r.name, weblog) handler = logging(handler, r.name, h.Logger)
} }
handler = recovery(handler, r.name, weblog) // make sure recovery is always last handler = recovery(handler, r.name, h.Logger) // make sure recovery is always last
h.mux.Add(r.method, r.pattern, handler) h.mux.Add(r.method, r.pattern, handler)
} }
@ -139,7 +142,12 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string)
return h return h
} }
//ServeHTTP responds to HTTP request to the handler. // SetLogOutput sets writer for all handler log output.
func (h *Handler) SetLogOutput(w io.Writer) {
h.Logger = log.New(w, "[http] ", log.LstdFlags)
}
// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.mux.ServeHTTP(w, r) h.mux.ServeHTTP(w, r)
} }
@ -168,8 +176,19 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ
// serveWrite receives incoming series data and writes it to the database. // serveWrite receives incoming series data and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) { func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
var bp influxdb.BatchPoints var bp influxdb.BatchPoints
var dec *json.Decoder
dec := json.NewDecoder(r.Body) if h.WriteTrace {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
h.Logger.Print("write handler failed to read bytes from request body")
} else {
h.Logger.Printf("write body received by handler: %s", string(b))
}
dec = json.NewDecoder(strings.NewReader(string(b)))
} else {
dec = json.NewDecoder(r.Body)
}
var writeError = func(result influxdb.Result, statusCode int) { var writeError = func(result influxdb.Result, statusCode int) {
w.WriteHeader(statusCode) w.WriteHeader(statusCode)

View File

@ -65,7 +65,8 @@ type Server struct {
shards map[uint64]*Shard // shards by shard id shards map[uint64]*Shard // shards by shard id
Logger *log.Logger Logger *log.Logger
WriteTrace bool // Detailed logging of write path
authenticationEnabled bool authenticationEnabled bool
@ -1369,6 +1370,11 @@ type Point struct {
// WriteSeries writes series data to the database. // WriteSeries writes series data to the database.
// Returns the messaging index the data was written to. // Returns the messaging index the data was written to.
func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) { func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) {
if s.WriteTrace {
log.Printf("received write for database '%s', retention policy '%s', with %d points",
database, retentionPolicy, len(points))
}
// Make sure every point has at least one field. // Make sure every point has at least one field.
for _, p := range points { for _, p := range points {
if len(p.Fields) == 0 { if len(p.Fields) == 0 {
@ -1391,11 +1397,17 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
if err := s.createMeasurementsIfNotExists(database, retentionPolicy, points); err != nil { if err := s.createMeasurementsIfNotExists(database, retentionPolicy, points); err != nil {
return 0, err return 0, err
} }
if s.WriteTrace {
log.Printf("measurements and series created on database '%s'", database)
}
// Ensure all the required shard groups exist. TODO: this should be done async. // Ensure all the required shard groups exist. TODO: this should be done async.
if err := s.createShardGroupsIfNotExists(database, retentionPolicy, points); err != nil { if err := s.createShardGroupsIfNotExists(database, retentionPolicy, points); err != nil {
return 0, err return 0, err
} }
if s.WriteTrace {
log.Printf("shard groups created for database '%s'", database)
}
// Build writeRawSeriesMessageType publish commands. // Build writeRawSeriesMessageType publish commands.
shardData := make(map[uint64][]byte, 0) shardData := make(map[uint64][]byte, 0)
@ -1420,9 +1432,15 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
if err != nil { if err != nil {
return err return err
} }
if s.WriteTrace {
log.Printf("shard group located: %v", g)
}
// Find appropriate shard within the shard group. // Find appropriate shard within the shard group.
sh := g.ShardBySeriesID(series.ID) sh := g.ShardBySeriesID(series.ID)
if s.WriteTrace {
log.Printf("shard located: %v", sh)
}
// Many points are likely to have the same Measurement name. Re-use codecs if possible. // Many points are likely to have the same Measurement name. Re-use codecs if possible.
var codec *FieldCodec var codec *FieldCodec
@ -1445,6 +1463,9 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
shardData[sh.ID] = make([]byte, 0) shardData[sh.ID] = make([]byte, 0)
} }
shardData[sh.ID] = append(shardData[sh.ID], data...) shardData[sh.ID] = append(shardData[sh.ID], data...)
if s.WriteTrace {
log.Printf("data appended to buffer for shard %d", sh.ID)
}
} }
return nil return nil
@ -1467,6 +1488,9 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
if index > maxIndex { if index > maxIndex {
maxIndex = index maxIndex = index
} }
if s.WriteTrace {
log.Printf("write series message published successfully for topic %d", i)
}
} }
return maxIndex, err return maxIndex, err
@ -1481,10 +1505,16 @@ func (s *Server) applyWriteRawSeries(m *messaging.Message) error {
if sh == nil { if sh == nil {
return ErrShardNotFound return ErrShardNotFound
} }
if s.WriteTrace {
log.Printf("received write message for application, shard %d", sh.ID)
}
if err := sh.writeSeries(m.Data); err != nil { if err := sh.writeSeries(m.Data); err != nil {
return err return err
} }
if s.WriteTrace {
log.Printf("write message successfully applied to shard %d", sh.ID)
}
return nil return nil
} }