Merge pull request #3996 from influxdb/httpd_stats2

Add stats to httpd package
pull/4008/head
Philip O'Toole 2015-09-04 12:38:57 -07:00
commit a500eb6c6d
4 changed files with 60 additions and 10 deletions

View File

@ -12,6 +12,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#3975](https://github.com/influxdb/influxdb/pull/3975): Add shard copy service - [#3975](https://github.com/influxdb/influxdb/pull/3975): Add shard copy service
- [#3986](https://github.com/influxdb/influxdb/pull/3986): Support sorting by time desc - [#3986](https://github.com/influxdb/influxdb/pull/3986): Support sorting by time desc
- [#3930](https://github.com/influxdb/influxdb/pull/3930): Wire up TOP aggregate function - fixes [#1821](https://github.com/influxdb/influxdb/issues/1821) - [#3930](https://github.com/influxdb/influxdb/pull/3930): Wire up TOP aggregate function - fixes [#1821](https://github.com/influxdb/influxdb/issues/1821)
- [#3996](https://github.com/influxdb/influxdb/pull/3996): Add statistics to httpd package
### Bugfixes ### Bugfixes
- [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803. - [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803.

View File

@ -74,16 +74,18 @@ type Handler struct {
Logger *log.Logger Logger *log.Logger
loggingEnabled bool // Log every HTTP access. loggingEnabled bool // Log every HTTP access.
WriteTrace bool // Detailed logging of write path WriteTrace bool // Detailed logging of write path
statMap *expvar.Map
} }
// NewHandler returns a new instance of handler with routes. // NewHandler returns a new instance of handler with routes.
func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool) *Handler { func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, statMap *expvar.Map) *Handler {
h := &Handler{ h := &Handler{
mux: pat.New(), mux: pat.New(),
requireAuthentication: requireAuthentication, requireAuthentication: requireAuthentication,
Logger: log.New(os.Stderr, "[http] ", log.LstdFlags), Logger: log.New(os.Stderr, "[http] ", log.LstdFlags),
loggingEnabled: loggingEnabled, loggingEnabled: loggingEnabled,
WriteTrace: writeTrace, WriteTrace: writeTrace,
statMap: statMap,
} }
h.SetRoutes([]route{ h.SetRoutes([]route{
@ -150,6 +152,8 @@ func (h *Handler) SetRoutes(routes []route) {
// ServeHTTP responds to HTTP request to the handler. // ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.statMap.Add(statRequest, 1)
// FIXME(benbjohnson): Add pprof enabled flag. // FIXME(benbjohnson): Add pprof enabled flag.
if strings.HasPrefix(r.URL.Path, "/debug/pprof") { if strings.HasPrefix(r.URL.Path, "/debug/pprof") {
switch r.URL.Path { switch r.URL.Path {
@ -170,6 +174,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) { func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
h.statMap.Add(statCQRequest, 1)
// If the continuous query service isn't configured, return 404. // If the continuous query service isn't configured, return 404.
if h.ContinuousQuerier == nil { if h.ContinuousQuerier == nil {
w.WriteHeader(http.StatusNotImplemented) w.WriteHeader(http.StatusNotImplemented)
@ -210,6 +216,8 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R
// serveQuery parses an incoming query and, if valid, executes the query. // serveQuery parses an incoming query and, if valid, executes the query.
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) { func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
h.statMap.Add(statQueryRequest, 1)
q := r.URL.Query() q := r.URL.Query()
pretty := q.Get("pretty") == "true" pretty := q.Get("pretty") == "true"
@ -288,9 +296,10 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
// Write out result immediately if chunked. // Write out result immediately if chunked.
if chunked { if chunked {
w.Write(MarshalJSON(Response{ n, _ := w.Write(MarshalJSON(Response{
Results: []*influxql.Result{r}, Results: []*influxql.Result{r},
}, pretty)) }, pretty))
h.statMap.Add(statQueryRequestBytesTransmitted, int64(n))
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
continue continue
} }
@ -327,11 +336,13 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
// If it's not chunked we buffered everything in memory, so write it out // If it's not chunked we buffered everything in memory, so write it out
if !chunked { if !chunked {
w.Write(MarshalJSON(resp, pretty)) n, _ := w.Write(MarshalJSON(resp, pretty))
h.statMap.Add(statQueryRequestBytesTransmitted, int64(n))
} }
} }
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) { func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
h.statMap.Add(statWriteRequest, 1)
// Handle gzip decoding of the body // Handle gzip decoding of the body
body := r.Body body := r.Body
@ -353,6 +364,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest) h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return return
} }
h.statMap.Add(statWriteRequestBytesReceived, int64(len(b)))
if h.WriteTrace { if h.WriteTrace {
h.Logger.Printf("write body received by handler: %s", string(b)) h.Logger.Printf("write body received by handler: %s", string(b))
} }
@ -415,13 +427,16 @@ func (h *Handler) serveWriteJSON(w http.ResponseWriter, r *http.Request, body []
RetentionPolicy: bp.RetentionPolicy, RetentionPolicy: bp.RetentionPolicy,
ConsistencyLevel: cluster.ConsistencyLevelOne, ConsistencyLevel: cluster.ConsistencyLevelOne,
Points: points, Points: points,
}); influxdb.IsClientError(err) { }); err != nil {
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest) h.statMap.Add(statPointsWrittenFail, int64(len(points)))
return if influxdb.IsClientError(err) {
} else if err != nil { h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError) } else {
h.writeError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
}
return return
} }
h.statMap.Add(statPointsWrittenOK, int64(len(points)))
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }
@ -512,13 +527,16 @@ func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body []
ConsistencyLevel: consistency, ConsistencyLevel: consistency,
Points: points, Points: points,
}); influxdb.IsClientError(err) { }); influxdb.IsClientError(err) {
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest) h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return return
} else if err != nil { } else if err != nil {
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
h.writeError(w, influxql.Result{Err: err}, http.StatusInternalServerError) h.writeError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
return return
} }
h.statMap.Add(statPointsWrittenOK, int64(len(points)))
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }
@ -529,6 +547,7 @@ func (h *Handler) serveOptions(w http.ResponseWriter, r *http.Request) {
// servePing returns a simple response to let the client know the server is running. // servePing returns a simple response to let the client know the server is running.
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) { func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
h.statMap.Add(statPingRequest, 1)
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }
@ -667,16 +686,19 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo)
if requireAuthentication && len(uis) > 0 { if requireAuthentication && len(uis) > 0 {
username, password, err := parseCredentials(r) username, password, err := parseCredentials(r)
if err != nil { if err != nil {
h.statMap.Add(statAuthFail, 1)
httpError(w, err.Error(), false, http.StatusUnauthorized) httpError(w, err.Error(), false, http.StatusUnauthorized)
return return
} }
if username == "" { if username == "" {
h.statMap.Add(statAuthFail, 1)
httpError(w, "username required", false, http.StatusUnauthorized) httpError(w, "username required", false, http.StatusUnauthorized)
return return
} }
user, err = h.MetaStore.Authenticate(username, password) user, err = h.MetaStore.Authenticate(username, password)
if err != nil { if err != nil {
h.statMap.Add(statAuthFail, 1)
httpError(w, err.Error(), false, http.StatusUnauthorized) httpError(w, err.Error(), false, http.StatusUnauthorized)
return return
} }

View File

@ -12,6 +12,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/client" "github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/meta"
@ -365,8 +366,9 @@ type Handler struct {
// NewHandler returns a new instance of Handler. // NewHandler returns a new instance of Handler.
func NewHandler(requireAuthentication bool) *Handler { func NewHandler(requireAuthentication bool) *Handler {
statMap := influxdb.NewStatistics("httpd", "httpd", nil)
h := &Handler{ h := &Handler{
Handler: httpd.NewHandler(requireAuthentication, true, false), Handler: httpd.NewHandler(requireAuthentication, true, false, statMap),
} }
h.Handler.MetaStore = &h.MetaStore h.Handler.MetaStore = &h.MetaStore
h.Handler.QueryExecutor = &h.QueryExecutor h.Handler.QueryExecutor = &h.QueryExecutor

View File

@ -2,12 +2,29 @@ package httpd
import ( import (
"crypto/tls" "crypto/tls"
"expvar"
"fmt" "fmt"
"log" "log"
"net" "net"
"net/http" "net/http"
"os" "os"
"strings" "strings"
"github.com/influxdb/influxdb"
)
// statistics gathered by the httpd package.
const (
statRequest = "req" // Number of HTTP requests served
statCQRequest = "cq_req" // Number of CQ-execute requests served
statQueryRequest = "query_req" // Number of query requests served
statWriteRequest = "write_req" // Number of write requests serverd
statPingRequest = "ping_req" // Number of ping requests served
statWriteRequestBytesReceived = "write_req_bytes" // Sum of all bytes in write requests
statQueryRequestBytesTransmitted = "query_resp_bytes" // Sum of all bytes returned in query reponses
statPointsWrittenOK = "points_written_ok" // Number of points written OK
statPointsWrittenFail = "points_written_fail" // Number of points that failed to be written
statAuthFail = "auth_fail" // Number of authentication failures
) )
// Service manages the listener and handler for an HTTP endpoint. // Service manages the listener and handler for an HTTP endpoint.
@ -20,11 +37,18 @@ type Service struct {
Handler *Handler Handler *Handler
Logger *log.Logger Logger *log.Logger
statMap *expvar.Map
} }
// NewService returns a new instance of Service. // NewService returns a new instance of Service.
func NewService(c Config) *Service { func NewService(c Config) *Service {
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
// should be done before any data could arrive for the service.
key := strings.Join([]string{"httpd", c.BindAddress}, ":")
tags := map[string]string{"bind": c.BindAddress}
statMap := influxdb.NewStatistics(key, "httpd", tags)
s := &Service{ s := &Service{
addr: c.BindAddress, addr: c.BindAddress,
https: c.HttpsEnabled, https: c.HttpsEnabled,
@ -34,6 +58,7 @@ func NewService(c Config) *Service {
c.AuthEnabled, c.AuthEnabled,
c.LogEnabled, c.LogEnabled,
c.WriteTracing, c.WriteTracing,
statMap,
), ),
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags), Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
} }