Add client and server status code stats
parent
046561181c
commit
36480c6271
|
@ -202,12 +202,24 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
h.statMap.Add(statRequestDuration, time.Since(start).Nanoseconds())
|
h.statMap.Add(statRequestDuration, time.Since(start).Nanoseconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// writeHeader writes the provided status code in the response, and
|
||||||
|
// updates relevant http error statistics.
|
||||||
|
func (h *Handler) writeHeader(w http.ResponseWriter, code int) {
|
||||||
|
switch code / 100 {
|
||||||
|
case 4:
|
||||||
|
h.statMap.Add(statClientError, 1)
|
||||||
|
case 5:
|
||||||
|
h.statMap.Add(statServerError, 1)
|
||||||
|
}
|
||||||
|
w.WriteHeader(code)
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
|
func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
|
||||||
h.statMap.Add(statCQRequest, 1)
|
h.statMap.Add(statCQRequest, 1)
|
||||||
|
|
||||||
// If the continuous query service isn't configured, return 404.
|
// If the continuous query service isn't configured, return 404.
|
||||||
if h.ContinuousQuerier == nil {
|
if h.ContinuousQuerier == nil {
|
||||||
w.WriteHeader(http.StatusNotImplemented)
|
h.writeHeader(w, http.StatusNotImplemented)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,7 +239,7 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R
|
||||||
// Try parsing as an int64 nanosecond timestamp.
|
// Try parsing as an int64 nanosecond timestamp.
|
||||||
i, err := strconv.ParseInt(s, 10, 64)
|
i, err := strconv.ParseInt(s, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
h.writeHeader(w, http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t = time.Unix(0, i)
|
t = time.Unix(0, i)
|
||||||
|
@ -236,11 +248,11 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R
|
||||||
|
|
||||||
// Pass the request to the CQ service.
|
// Pass the request to the CQ service.
|
||||||
if err := h.ContinuousQuerier.Run(db, name, t); err != nil {
|
if err := h.ContinuousQuerier.Run(db, name, t); err != nil {
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
h.writeHeader(w, http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
w.WriteHeader(http.StatusNoContent)
|
h.writeHeader(w, http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
// serveQuery parses an incoming query and, if valid, executes the query.
|
// serveQuery parses an incoming query and, if valid, executes the query.
|
||||||
|
@ -254,7 +266,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
||||||
|
|
||||||
qp := strings.TrimSpace(r.FormValue("q"))
|
qp := strings.TrimSpace(r.FormValue("q"))
|
||||||
if qp == "" {
|
if qp == "" {
|
||||||
httpError(w, `missing required parameter "q"`, pretty, http.StatusBadRequest)
|
h.httpError(w, `missing required parameter "q"`, pretty, http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,7 +282,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
||||||
// Parse query from query string.
|
// Parse query from query string.
|
||||||
query, err := p.ParseQuery()
|
query, err := p.ParseQuery()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
httpError(w, "error parsing query: "+err.Error(), pretty, http.StatusBadRequest)
|
h.httpError(w, "error parsing query: "+err.Error(), pretty, http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -280,7 +292,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
||||||
if err, ok := err.(meta.ErrAuthorize); ok {
|
if err, ok := err.(meta.ErrAuthorize); ok {
|
||||||
h.Logger.Printf("unauthorized request | user: %q | query: %q | database %q\n", err.User, err.Query.String(), err.Database)
|
h.Logger.Printf("unauthorized request | user: %q | query: %q | database %q\n", err.User, err.Query.String(), err.Database)
|
||||||
}
|
}
|
||||||
httpError(w, "error authorizing query: "+err.Error(), pretty, http.StatusUnauthorized)
|
h.httpError(w, "error authorizing query: "+err.Error(), pretty, http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -327,7 +339,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
||||||
resp := Response{Results: make([]*influxql.Result, 0)}
|
resp := Response{Results: make([]*influxql.Result, 0)}
|
||||||
|
|
||||||
// Status header is OK once this point is reached.
|
// Status header is OK once this point is reached.
|
||||||
w.WriteHeader(http.StatusOK)
|
h.writeHeader(w, http.StatusOK)
|
||||||
|
|
||||||
// pull all results from the channel
|
// pull all results from the channel
|
||||||
rows := 0
|
rows := 0
|
||||||
|
@ -418,23 +430,23 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
||||||
|
|
||||||
database := r.URL.Query().Get("db")
|
database := r.URL.Query().Get("db")
|
||||||
if database == "" {
|
if database == "" {
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
|
h.resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if di := h.MetaClient.Database(database); di == nil {
|
if di := h.MetaClient.Database(database); di == nil {
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
|
h.resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if h.Config.AuthEnabled && user == nil {
|
if h.Config.AuthEnabled && user == nil {
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
|
h.resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if h.Config.AuthEnabled {
|
if h.Config.AuthEnabled {
|
||||||
if err := h.WriteAuthorizer.AuthorizeWrite(user.Name, database); err != nil {
|
if err := h.WriteAuthorizer.AuthorizeWrite(user.Name, database); err != nil {
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
|
h.resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -444,7 +456,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
||||||
if r.Header.Get("Content-encoding") == "gzip" {
|
if r.Header.Get("Content-encoding") == "gzip" {
|
||||||
b, err := gzip.NewReader(r.Body)
|
b, err := gzip.NewReader(r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
h.resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer b.Close()
|
defer b.Close()
|
||||||
|
@ -466,7 +478,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
||||||
if h.Config.WriteTracing {
|
if h.Config.WriteTracing {
|
||||||
h.Logger.Print("write handler unable to read bytes from request body")
|
h.Logger.Print("write handler unable to read bytes from request body")
|
||||||
}
|
}
|
||||||
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
h.resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
h.statMap.Add(statWriteRequestBytesReceived, int64(buf.Len()))
|
h.statMap.Add(statWriteRequestBytesReceived, int64(buf.Len()))
|
||||||
|
@ -479,10 +491,10 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
||||||
// Not points parsed correctly so return the error now
|
// Not points parsed correctly so return the error now
|
||||||
if parseError != nil && len(points) == 0 {
|
if parseError != nil && len(points) == 0 {
|
||||||
if parseError.Error() == "EOF" {
|
if parseError.Error() == "EOF" {
|
||||||
w.WriteHeader(http.StatusOK)
|
h.writeHeader(w, http.StatusOK)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resultError(w, influxql.Result{Err: parseError}, http.StatusBadRequest)
|
h.resultError(w, influxql.Result{Err: parseError}, http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -493,7 +505,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
||||||
var err error
|
var err error
|
||||||
consistency, err = models.ParseConsistencyLevel(level)
|
consistency, err = models.ParseConsistencyLevel(level)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
h.resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -501,41 +513,41 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
||||||
// Write points.
|
// Write points.
|
||||||
if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, points); influxdb.IsClientError(err) {
|
if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, points); influxdb.IsClientError(err) {
|
||||||
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
|
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
|
||||||
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
h.resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
|
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
|
||||||
resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
|
h.resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
} else if parseError != nil {
|
} else if parseError != nil {
|
||||||
// We wrote some of the points
|
// We wrote some of the points
|
||||||
h.statMap.Add(statPointsWrittenOK, int64(len(points)))
|
h.statMap.Add(statPointsWrittenOK, int64(len(points)))
|
||||||
// The other points failed to parse which means the client sent invalid line protocol. We return a 400
|
// The other points failed to parse which means the client sent invalid line protocol. We return a 400
|
||||||
// response code as well as the lines that failed to parse.
|
// response code as well as the lines that failed to parse.
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("partial write:\n%v", parseError)}, http.StatusBadRequest)
|
h.resultError(w, influxql.Result{Err: fmt.Errorf("partial write:\n%v", parseError)}, http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
h.statMap.Add(statPointsWrittenOK, int64(len(points)))
|
h.statMap.Add(statPointsWrittenOK, int64(len(points)))
|
||||||
w.WriteHeader(http.StatusNoContent)
|
h.writeHeader(w, http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
// serveOptions returns an empty response to comply with OPTIONS pre-flight requests
|
// serveOptions returns an empty response to comply with OPTIONS pre-flight requests
|
||||||
func (h *Handler) serveOptions(w http.ResponseWriter, r *http.Request) {
|
func (h *Handler) serveOptions(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusNoContent)
|
h.writeHeader(w, http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
// servePing returns a simple response to let the client know the server is running.
|
// servePing returns a simple response to let the client know the server is running.
|
||||||
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
|
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
|
||||||
h.statMap.Add(statPingRequest, 1)
|
h.statMap.Add(statPingRequest, 1)
|
||||||
w.WriteHeader(http.StatusNoContent)
|
h.writeHeader(w, http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
// serveStatus has been depricated
|
// serveStatus has been depricated
|
||||||
func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) {
|
func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) {
|
||||||
h.Logger.Printf("WARNING: /status has been depricated. Use /ping instead.")
|
h.Logger.Printf("WARNING: /status has been depricated. Use /ping instead.")
|
||||||
h.statMap.Add(statStatusRequest, 1)
|
h.statMap.Add(statStatusRequest, 1)
|
||||||
w.WriteHeader(http.StatusNoContent)
|
h.writeHeader(w, http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
// convertToEpoch converts result timestamps from time.Time to the specified epoch.
|
// convertToEpoch converts result timestamps from time.Time to the specified epoch.
|
||||||
|
@ -595,10 +607,10 @@ func serveExpvar(w http.ResponseWriter, r *http.Request) {
|
||||||
fmt.Fprintf(w, "\n}\n")
|
fmt.Fprintf(w, "\n}\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
// httpError writes an error to the client in a standard format.
|
// h.httpError writes an error to the client in a standard format.
|
||||||
func httpError(w http.ResponseWriter, error string, pretty bool, code int) {
|
func (h *Handler) httpError(w http.ResponseWriter, error string, pretty bool, code int) {
|
||||||
w.Header().Add("content-type", "application/json")
|
w.Header().Add("content-type", "application/json")
|
||||||
w.WriteHeader(code)
|
h.writeHeader(w, code)
|
||||||
|
|
||||||
response := Response{Err: errors.New(error)}
|
response := Response{Err: errors.New(error)}
|
||||||
var b []byte
|
var b []byte
|
||||||
|
@ -610,9 +622,9 @@ func httpError(w http.ResponseWriter, error string, pretty bool, code int) {
|
||||||
w.Write(b)
|
w.Write(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
func resultError(w http.ResponseWriter, result influxql.Result, code int) {
|
func (h *Handler) resultError(w http.ResponseWriter, result influxql.Result, code int) {
|
||||||
w.Header().Add("content-type", "application/json")
|
w.Header().Add("content-type", "application/json")
|
||||||
w.WriteHeader(code)
|
h.writeHeader(w, code)
|
||||||
_ = json.NewEncoder(w).Encode(&result)
|
_ = json.NewEncoder(w).Encode(&result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -698,7 +710,7 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo)
|
||||||
creds, err := parseCredentials(r)
|
creds, err := parseCredentials(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.statMap.Add(statAuthFail, 1)
|
h.statMap.Add(statAuthFail, 1)
|
||||||
httpError(w, err.Error(), false, http.StatusUnauthorized)
|
h.httpError(w, err.Error(), false, http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -706,14 +718,14 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo)
|
||||||
case UserAuthentication:
|
case UserAuthentication:
|
||||||
if creds.Username == "" {
|
if creds.Username == "" {
|
||||||
h.statMap.Add(statAuthFail, 1)
|
h.statMap.Add(statAuthFail, 1)
|
||||||
httpError(w, "username required", false, http.StatusUnauthorized)
|
h.httpError(w, "username required", false, http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
user, err = h.MetaClient.Authenticate(creds.Username, creds.Password)
|
user, err = h.MetaClient.Authenticate(creds.Username, creds.Password)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.statMap.Add(statAuthFail, 1)
|
h.statMap.Add(statAuthFail, 1)
|
||||||
httpError(w, "authorization failed", false, http.StatusUnauthorized)
|
h.httpError(w, "authorization failed", false, http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case BearerAuthentication:
|
case BearerAuthentication:
|
||||||
|
@ -728,39 +740,39 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo)
|
||||||
// Parse and validate the token.
|
// Parse and validate the token.
|
||||||
token, err := jwt.Parse(creds.Token, keyLookupFn)
|
token, err := jwt.Parse(creds.Token, keyLookupFn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
httpError(w, err.Error(), false, http.StatusUnauthorized)
|
h.httpError(w, err.Error(), false, http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
} else if !token.Valid {
|
} else if !token.Valid {
|
||||||
httpError(w, "invalid token", false, http.StatusUnauthorized)
|
h.httpError(w, "invalid token", false, http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure an expiration was set on the token.
|
// Make sure an expiration was set on the token.
|
||||||
if exp, ok := token.Claims["exp"].(float64); !ok || exp <= 0.0 {
|
if exp, ok := token.Claims["exp"].(float64); !ok || exp <= 0.0 {
|
||||||
httpError(w, "token expiration required", false, http.StatusUnauthorized)
|
h.httpError(w, "token expiration required", false, http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the username from the token.
|
// Get the username from the token.
|
||||||
username, ok := token.Claims["username"].(string)
|
username, ok := token.Claims["username"].(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
httpError(w, "username in token must be a string", false, http.StatusUnauthorized)
|
h.httpError(w, "username in token must be a string", false, http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
} else if username == "" {
|
} else if username == "" {
|
||||||
httpError(w, "token must contain a username", false, http.StatusUnauthorized)
|
h.httpError(w, "token must contain a username", false, http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lookup user in the metastore.
|
// Lookup user in the metastore.
|
||||||
if user, err = h.MetaClient.User(username); err != nil {
|
if user, err = h.MetaClient.User(username); err != nil {
|
||||||
httpError(w, err.Error(), false, http.StatusUnauthorized)
|
h.httpError(w, err.Error(), false, http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
} else if user == nil {
|
} else if user == nil {
|
||||||
httpError(w, meta.ErrUserNotFound.Error(), false, http.StatusUnauthorized)
|
h.httpError(w, meta.ErrUserNotFound.Error(), false, http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
httpError(w, "unsupported authentication", false, http.StatusUnauthorized)
|
h.httpError(w, "unsupported authentication", false, http.StatusUnauthorized)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,8 @@ const (
|
||||||
statQueryRequestDuration = "queryReqDurationNs" // Number of (wall-time) nanoseconds spent inside query requests
|
statQueryRequestDuration = "queryReqDurationNs" // Number of (wall-time) nanoseconds spent inside query requests
|
||||||
statWriteRequestDuration = "writeReqDurationNs" // Number of (wall-time) nanoseconds spent inside write requests
|
statWriteRequestDuration = "writeReqDurationNs" // Number of (wall-time) nanoseconds spent inside write requests
|
||||||
statRequestsActive = "reqActive" // Number of currently active requests
|
statRequestsActive = "reqActive" // Number of currently active requests
|
||||||
|
statClientError = "clientError" // Number of HTTP responses due to client error
|
||||||
|
statServerError = "serverError" // Number of HTTP responses due to server error
|
||||||
)
|
)
|
||||||
|
|
||||||
// Service manages the listener and handler for an HTTP endpoint.
|
// Service manages the listener and handler for an HTTP endpoint.
|
||||||
|
|
Loading…
Reference in New Issue