From 36480c6271ea011dabdd7255297c215f0b27797a Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 17 May 2016 16:57:20 +0100 Subject: [PATCH 1/2] Add client and server status code stats --- services/httpd/handler.go | 92 ++++++++++++++++++++++----------------- services/httpd/service.go | 2 + 2 files changed, 54 insertions(+), 40 deletions(-) diff --git a/services/httpd/handler.go b/services/httpd/handler.go index d2cef6b406..842d90fd33 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -202,12 +202,24 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.statMap.Add(statRequestDuration, time.Since(start).Nanoseconds()) } +// writeHeader writes the provided status code in the response, and +// updates relevant http error statistics. +func (h *Handler) writeHeader(w http.ResponseWriter, code int) { + switch code / 100 { + case 4: + h.statMap.Add(statClientError, 1) + case 5: + h.statMap.Add(statServerError, 1) + } + w.WriteHeader(code) +} + func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) { h.statMap.Add(statCQRequest, 1) // If the continuous query service isn't configured, return 404. if h.ContinuousQuerier == nil { - w.WriteHeader(http.StatusNotImplemented) + h.writeHeader(w, http.StatusNotImplemented) return } @@ -227,7 +239,7 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R // Try parsing as an int64 nanosecond timestamp. i, err := strconv.ParseInt(s, 10, 64) if err != nil { - w.WriteHeader(http.StatusBadRequest) + h.writeHeader(w, http.StatusBadRequest) return } t = time.Unix(0, i) @@ -236,11 +248,11 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R // Pass the request to the CQ service. if err := h.ContinuousQuerier.Run(db, name, t); err != nil { - w.WriteHeader(http.StatusBadRequest) + h.writeHeader(w, http.StatusBadRequest) return } - w.WriteHeader(http.StatusNoContent) + h.writeHeader(w, http.StatusNoContent) } // serveQuery parses an incoming query and, if valid, executes the query. @@ -254,7 +266,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. qp := strings.TrimSpace(r.FormValue("q")) if qp == "" { - httpError(w, `missing required parameter "q"`, pretty, http.StatusBadRequest) + h.httpError(w, `missing required parameter "q"`, pretty, http.StatusBadRequest) return } @@ -270,7 +282,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. // Parse query from query string. query, err := p.ParseQuery() if err != nil { - httpError(w, "error parsing query: "+err.Error(), pretty, http.StatusBadRequest) + h.httpError(w, "error parsing query: "+err.Error(), pretty, http.StatusBadRequest) return } @@ -280,7 +292,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. if err, ok := err.(meta.ErrAuthorize); ok { h.Logger.Printf("unauthorized request | user: %q | query: %q | database %q\n", err.User, err.Query.String(), err.Database) } - httpError(w, "error authorizing query: "+err.Error(), pretty, http.StatusUnauthorized) + h.httpError(w, "error authorizing query: "+err.Error(), pretty, http.StatusUnauthorized) return } } @@ -327,7 +339,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. resp := Response{Results: make([]*influxql.Result, 0)} // Status header is OK once this point is reached. - w.WriteHeader(http.StatusOK) + h.writeHeader(w, http.StatusOK) // pull all results from the channel rows := 0 @@ -418,23 +430,23 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta. database := r.URL.Query().Get("db") if database == "" { - resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest) + h.resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest) return } if di := h.MetaClient.Database(database); di == nil { - resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound) + h.resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound) return } if h.Config.AuthEnabled && user == nil { - resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized) + h.resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized) return } if h.Config.AuthEnabled { if err := h.WriteAuthorizer.AuthorizeWrite(user.Name, database); err != nil { - resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized) + h.resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized) return } } @@ -444,7 +456,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta. if r.Header.Get("Content-encoding") == "gzip" { b, err := gzip.NewReader(r.Body) if err != nil { - resultError(w, influxql.Result{Err: err}, http.StatusBadRequest) + h.resultError(w, influxql.Result{Err: err}, http.StatusBadRequest) return } defer b.Close() @@ -466,7 +478,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta. if h.Config.WriteTracing { h.Logger.Print("write handler unable to read bytes from request body") } - resultError(w, influxql.Result{Err: err}, http.StatusBadRequest) + h.resultError(w, influxql.Result{Err: err}, http.StatusBadRequest) return } h.statMap.Add(statWriteRequestBytesReceived, int64(buf.Len())) @@ -479,10 +491,10 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta. // Not points parsed correctly so return the error now if parseError != nil && len(points) == 0 { if parseError.Error() == "EOF" { - w.WriteHeader(http.StatusOK) + h.writeHeader(w, http.StatusOK) return } - resultError(w, influxql.Result{Err: parseError}, http.StatusBadRequest) + h.resultError(w, influxql.Result{Err: parseError}, http.StatusBadRequest) return } @@ -493,7 +505,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta. var err error consistency, err = models.ParseConsistencyLevel(level) if err != nil { - resultError(w, influxql.Result{Err: err}, http.StatusBadRequest) + h.resultError(w, influxql.Result{Err: err}, http.StatusBadRequest) return } } @@ -501,41 +513,41 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta. // Write points. if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, points); influxdb.IsClientError(err) { h.statMap.Add(statPointsWrittenFail, int64(len(points))) - resultError(w, influxql.Result{Err: err}, http.StatusBadRequest) + h.resultError(w, influxql.Result{Err: err}, http.StatusBadRequest) return } else if err != nil { h.statMap.Add(statPointsWrittenFail, int64(len(points))) - resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError) + h.resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError) return } else if parseError != nil { // We wrote some of the points h.statMap.Add(statPointsWrittenOK, int64(len(points))) // The other points failed to parse which means the client sent invalid line protocol. We return a 400 // response code as well as the lines that failed to parse. - resultError(w, influxql.Result{Err: fmt.Errorf("partial write:\n%v", parseError)}, http.StatusBadRequest) + h.resultError(w, influxql.Result{Err: fmt.Errorf("partial write:\n%v", parseError)}, http.StatusBadRequest) return } h.statMap.Add(statPointsWrittenOK, int64(len(points))) - w.WriteHeader(http.StatusNoContent) + h.writeHeader(w, http.StatusNoContent) } // serveOptions returns an empty response to comply with OPTIONS pre-flight requests func (h *Handler) serveOptions(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusNoContent) + h.writeHeader(w, http.StatusNoContent) } // servePing returns a simple response to let the client know the server is running. func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) { h.statMap.Add(statPingRequest, 1) - w.WriteHeader(http.StatusNoContent) + h.writeHeader(w, http.StatusNoContent) } // serveStatus has been depricated func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) { h.Logger.Printf("WARNING: /status has been depricated. Use /ping instead.") h.statMap.Add(statStatusRequest, 1) - w.WriteHeader(http.StatusNoContent) + h.writeHeader(w, http.StatusNoContent) } // convertToEpoch converts result timestamps from time.Time to the specified epoch. @@ -595,10 +607,10 @@ func serveExpvar(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "\n}\n") } -// httpError writes an error to the client in a standard format. -func httpError(w http.ResponseWriter, error string, pretty bool, code int) { +// h.httpError writes an error to the client in a standard format. +func (h *Handler) httpError(w http.ResponseWriter, error string, pretty bool, code int) { w.Header().Add("content-type", "application/json") - w.WriteHeader(code) + h.writeHeader(w, code) response := Response{Err: errors.New(error)} var b []byte @@ -610,9 +622,9 @@ func httpError(w http.ResponseWriter, error string, pretty bool, code int) { w.Write(b) } -func resultError(w http.ResponseWriter, result influxql.Result, code int) { +func (h *Handler) resultError(w http.ResponseWriter, result influxql.Result, code int) { w.Header().Add("content-type", "application/json") - w.WriteHeader(code) + h.writeHeader(w, code) _ = json.NewEncoder(w).Encode(&result) } @@ -698,7 +710,7 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo) creds, err := parseCredentials(r) if err != nil { h.statMap.Add(statAuthFail, 1) - httpError(w, err.Error(), false, http.StatusUnauthorized) + h.httpError(w, err.Error(), false, http.StatusUnauthorized) return } @@ -706,14 +718,14 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo) case UserAuthentication: if creds.Username == "" { h.statMap.Add(statAuthFail, 1) - httpError(w, "username required", false, http.StatusUnauthorized) + h.httpError(w, "username required", false, http.StatusUnauthorized) return } user, err = h.MetaClient.Authenticate(creds.Username, creds.Password) if err != nil { h.statMap.Add(statAuthFail, 1) - httpError(w, "authorization failed", false, http.StatusUnauthorized) + h.httpError(w, "authorization failed", false, http.StatusUnauthorized) return } case BearerAuthentication: @@ -728,39 +740,39 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo) // Parse and validate the token. token, err := jwt.Parse(creds.Token, keyLookupFn) if err != nil { - httpError(w, err.Error(), false, http.StatusUnauthorized) + h.httpError(w, err.Error(), false, http.StatusUnauthorized) return } else if !token.Valid { - httpError(w, "invalid token", false, http.StatusUnauthorized) + h.httpError(w, "invalid token", false, http.StatusUnauthorized) return } // Make sure an expiration was set on the token. if exp, ok := token.Claims["exp"].(float64); !ok || exp <= 0.0 { - httpError(w, "token expiration required", false, http.StatusUnauthorized) + h.httpError(w, "token expiration required", false, http.StatusUnauthorized) return } // Get the username from the token. username, ok := token.Claims["username"].(string) if !ok { - httpError(w, "username in token must be a string", false, http.StatusUnauthorized) + h.httpError(w, "username in token must be a string", false, http.StatusUnauthorized) return } else if username == "" { - httpError(w, "token must contain a username", false, http.StatusUnauthorized) + h.httpError(w, "token must contain a username", false, http.StatusUnauthorized) return } // Lookup user in the metastore. if user, err = h.MetaClient.User(username); err != nil { - httpError(w, err.Error(), false, http.StatusUnauthorized) + h.httpError(w, err.Error(), false, http.StatusUnauthorized) return } else if user == nil { - httpError(w, meta.ErrUserNotFound.Error(), false, http.StatusUnauthorized) + h.httpError(w, meta.ErrUserNotFound.Error(), false, http.StatusUnauthorized) return } default: - httpError(w, "unsupported authentication", false, http.StatusUnauthorized) + h.httpError(w, "unsupported authentication", false, http.StatusUnauthorized) } } diff --git a/services/httpd/service.go b/services/httpd/service.go index 59d631f063..5d4483f5de 100644 --- a/services/httpd/service.go +++ b/services/httpd/service.go @@ -32,6 +32,8 @@ const ( statQueryRequestDuration = "queryReqDurationNs" // Number of (wall-time) nanoseconds spent inside query requests statWriteRequestDuration = "writeReqDurationNs" // Number of (wall-time) nanoseconds spent inside write requests statRequestsActive = "reqActive" // Number of currently active requests + statClientError = "clientError" // Number of HTTP responses due to client error + statServerError = "serverError" // Number of HTTP responses due to server error ) // Service manages the listener and handler for an HTTP endpoint. From ce0064cd889d5ff2a40f4e5ac3620c3487208c56 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 17 May 2016 17:22:57 +0100 Subject: [PATCH 2/2] Add stat for currently active write requests --- CHANGELOG.md | 2 ++ services/httpd/handler.go | 2 ++ services/httpd/service.go | 1 + 3 files changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 372bbebf50..6d62ff9a2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ - [#6559](https://github.com/influxdata/influxdb/issues/6559): Teach the http service how to enforce connection limits. - [#6623](https://github.com/influxdata/influxdb/pull/6623): Speed up drop database - [#6519](https://github.com/influxdata/influxdb/issues/6519): Support cast syntax for selecting a specific type. +- [#6654](https://github.com/influxdata/influxdb/pull/6654): Add new HTTP statistics to monitoring + ### Bugfixes diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 842d90fd33..a054a4808e 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -424,7 +424,9 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. // serveWrite receives incoming series data in line protocol format and writes it to the database. func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) { h.statMap.Add(statWriteRequest, 1) + h.statMap.Add(statWriteRequestsActive, 1) defer func(start time.Time) { + h.statMap.Add(statWriteRequestsActive, -1) h.statMap.Add(statWriteRequestDuration, time.Since(start).Nanoseconds()) }(time.Now()) diff --git a/services/httpd/service.go b/services/httpd/service.go index 5d4483f5de..84bcd827ed 100644 --- a/services/httpd/service.go +++ b/services/httpd/service.go @@ -32,6 +32,7 @@ const ( statQueryRequestDuration = "queryReqDurationNs" // Number of (wall-time) nanoseconds spent inside query requests statWriteRequestDuration = "writeReqDurationNs" // Number of (wall-time) nanoseconds spent inside write requests statRequestsActive = "reqActive" // Number of currently active requests + statWriteRequestsActive = "writeReqActive" // Number of currently active write requests statClientError = "clientError" // Number of HTTP responses due to client error statServerError = "serverError" // Number of HTTP responses due to server error )