commit
887e4ba4d4
|
@ -11,6 +11,8 @@
|
|||
- [#6559](https://github.com/influxdata/influxdb/issues/6559): Teach the http service how to enforce connection limits.
|
||||
- [#6623](https://github.com/influxdata/influxdb/pull/6623): Speed up drop database
|
||||
- [#6519](https://github.com/influxdata/influxdb/issues/6519): Support cast syntax for selecting a specific type.
|
||||
- [#6654](https://github.com/influxdata/influxdb/pull/6654): Add new HTTP statistics to monitoring
|
||||
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
@ -202,12 +202,24 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
h.statMap.Add(statRequestDuration, time.Since(start).Nanoseconds())
|
||||
}
|
||||
|
||||
// writeHeader writes the provided status code in the response, and
|
||||
// updates relevant http error statistics.
|
||||
func (h *Handler) writeHeader(w http.ResponseWriter, code int) {
|
||||
switch code / 100 {
|
||||
case 4:
|
||||
h.statMap.Add(statClientError, 1)
|
||||
case 5:
|
||||
h.statMap.Add(statServerError, 1)
|
||||
}
|
||||
w.WriteHeader(code)
|
||||
}
|
||||
|
||||
func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
|
||||
h.statMap.Add(statCQRequest, 1)
|
||||
|
||||
// If the continuous query service isn't configured, return 404.
|
||||
if h.ContinuousQuerier == nil {
|
||||
w.WriteHeader(http.StatusNotImplemented)
|
||||
h.writeHeader(w, http.StatusNotImplemented)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -227,7 +239,7 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R
|
|||
// Try parsing as an int64 nanosecond timestamp.
|
||||
i, err := strconv.ParseInt(s, 10, 64)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
h.writeHeader(w, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
t = time.Unix(0, i)
|
||||
|
@ -236,11 +248,11 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R
|
|||
|
||||
// Pass the request to the CQ service.
|
||||
if err := h.ContinuousQuerier.Run(db, name, t); err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
h.writeHeader(w, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
h.writeHeader(w, http.StatusNoContent)
|
||||
}
|
||||
|
||||
// serveQuery parses an incoming query and, if valid, executes the query.
|
||||
|
@ -254,7 +266,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
|
||||
qp := strings.TrimSpace(r.FormValue("q"))
|
||||
if qp == "" {
|
||||
httpError(w, `missing required parameter "q"`, pretty, http.StatusBadRequest)
|
||||
h.httpError(w, `missing required parameter "q"`, pretty, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -270,7 +282,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
// Parse query from query string.
|
||||
query, err := p.ParseQuery()
|
||||
if err != nil {
|
||||
httpError(w, "error parsing query: "+err.Error(), pretty, http.StatusBadRequest)
|
||||
h.httpError(w, "error parsing query: "+err.Error(), pretty, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -280,7 +292,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
if err, ok := err.(meta.ErrAuthorize); ok {
|
||||
h.Logger.Printf("unauthorized request | user: %q | query: %q | database %q\n", err.User, err.Query.String(), err.Database)
|
||||
}
|
||||
httpError(w, "error authorizing query: "+err.Error(), pretty, http.StatusUnauthorized)
|
||||
h.httpError(w, "error authorizing query: "+err.Error(), pretty, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -327,7 +339,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
resp := Response{Results: make([]*influxql.Result, 0)}
|
||||
|
||||
// Status header is OK once this point is reached.
|
||||
w.WriteHeader(http.StatusOK)
|
||||
h.writeHeader(w, http.StatusOK)
|
||||
|
||||
// pull all results from the channel
|
||||
rows := 0
|
||||
|
@ -412,29 +424,31 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
// serveWrite receives incoming series data in line protocol format and writes it to the database.
|
||||
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
|
||||
h.statMap.Add(statWriteRequest, 1)
|
||||
h.statMap.Add(statWriteRequestsActive, 1)
|
||||
defer func(start time.Time) {
|
||||
h.statMap.Add(statWriteRequestsActive, -1)
|
||||
h.statMap.Add(statWriteRequestDuration, time.Since(start).Nanoseconds())
|
||||
}(time.Now())
|
||||
|
||||
database := r.URL.Query().Get("db")
|
||||
if database == "" {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
|
||||
h.resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if di := h.MetaClient.Database(database); di == nil {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
|
||||
h.resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
if h.Config.AuthEnabled && user == nil {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
|
||||
h.resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
if h.Config.AuthEnabled {
|
||||
if err := h.WriteAuthorizer.AuthorizeWrite(user.Name, database); err != nil {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
|
||||
h.resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -444,7 +458,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
if r.Header.Get("Content-encoding") == "gzip" {
|
||||
b, err := gzip.NewReader(r.Body)
|
||||
if err != nil {
|
||||
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
h.resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
defer b.Close()
|
||||
|
@ -466,7 +480,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
if h.Config.WriteTracing {
|
||||
h.Logger.Print("write handler unable to read bytes from request body")
|
||||
}
|
||||
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
h.resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
h.statMap.Add(statWriteRequestBytesReceived, int64(buf.Len()))
|
||||
|
@ -479,10 +493,10 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
// Not points parsed correctly so return the error now
|
||||
if parseError != nil && len(points) == 0 {
|
||||
if parseError.Error() == "EOF" {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
h.writeHeader(w, http.StatusOK)
|
||||
return
|
||||
}
|
||||
resultError(w, influxql.Result{Err: parseError}, http.StatusBadRequest)
|
||||
h.resultError(w, influxql.Result{Err: parseError}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -493,7 +507,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
var err error
|
||||
consistency, err = models.ParseConsistencyLevel(level)
|
||||
if err != nil {
|
||||
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
h.resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -501,41 +515,41 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
// Write points.
|
||||
if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, points); influxdb.IsClientError(err) {
|
||||
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
|
||||
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
h.resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
return
|
||||
} else if err != nil {
|
||||
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
|
||||
resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
|
||||
h.resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
|
||||
return
|
||||
} else if parseError != nil {
|
||||
// We wrote some of the points
|
||||
h.statMap.Add(statPointsWrittenOK, int64(len(points)))
|
||||
// The other points failed to parse which means the client sent invalid line protocol. We return a 400
|
||||
// response code as well as the lines that failed to parse.
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("partial write:\n%v", parseError)}, http.StatusBadRequest)
|
||||
h.resultError(w, influxql.Result{Err: fmt.Errorf("partial write:\n%v", parseError)}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
h.statMap.Add(statPointsWrittenOK, int64(len(points)))
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
h.writeHeader(w, http.StatusNoContent)
|
||||
}
|
||||
|
||||
// serveOptions returns an empty response to comply with OPTIONS pre-flight requests
|
||||
func (h *Handler) serveOptions(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
h.writeHeader(w, http.StatusNoContent)
|
||||
}
|
||||
|
||||
// servePing returns a simple response to let the client know the server is running.
|
||||
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
|
||||
h.statMap.Add(statPingRequest, 1)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
h.writeHeader(w, http.StatusNoContent)
|
||||
}
|
||||
|
||||
// serveStatus has been depricated
|
||||
func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) {
|
||||
h.Logger.Printf("WARNING: /status has been depricated. Use /ping instead.")
|
||||
h.statMap.Add(statStatusRequest, 1)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
h.writeHeader(w, http.StatusNoContent)
|
||||
}
|
||||
|
||||
// convertToEpoch converts result timestamps from time.Time to the specified epoch.
|
||||
|
@ -595,10 +609,10 @@ func serveExpvar(w http.ResponseWriter, r *http.Request) {
|
|||
fmt.Fprintf(w, "\n}\n")
|
||||
}
|
||||
|
||||
// httpError writes an error to the client in a standard format.
|
||||
func httpError(w http.ResponseWriter, error string, pretty bool, code int) {
|
||||
// h.httpError writes an error to the client in a standard format.
|
||||
func (h *Handler) httpError(w http.ResponseWriter, error string, pretty bool, code int) {
|
||||
w.Header().Add("content-type", "application/json")
|
||||
w.WriteHeader(code)
|
||||
h.writeHeader(w, code)
|
||||
|
||||
response := Response{Err: errors.New(error)}
|
||||
var b []byte
|
||||
|
@ -610,9 +624,9 @@ func httpError(w http.ResponseWriter, error string, pretty bool, code int) {
|
|||
w.Write(b)
|
||||
}
|
||||
|
||||
func resultError(w http.ResponseWriter, result influxql.Result, code int) {
|
||||
func (h *Handler) resultError(w http.ResponseWriter, result influxql.Result, code int) {
|
||||
w.Header().Add("content-type", "application/json")
|
||||
w.WriteHeader(code)
|
||||
h.writeHeader(w, code)
|
||||
_ = json.NewEncoder(w).Encode(&result)
|
||||
}
|
||||
|
||||
|
@ -698,7 +712,7 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo)
|
|||
creds, err := parseCredentials(r)
|
||||
if err != nil {
|
||||
h.statMap.Add(statAuthFail, 1)
|
||||
httpError(w, err.Error(), false, http.StatusUnauthorized)
|
||||
h.httpError(w, err.Error(), false, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -706,14 +720,14 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo)
|
|||
case UserAuthentication:
|
||||
if creds.Username == "" {
|
||||
h.statMap.Add(statAuthFail, 1)
|
||||
httpError(w, "username required", false, http.StatusUnauthorized)
|
||||
h.httpError(w, "username required", false, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
user, err = h.MetaClient.Authenticate(creds.Username, creds.Password)
|
||||
if err != nil {
|
||||
h.statMap.Add(statAuthFail, 1)
|
||||
httpError(w, "authorization failed", false, http.StatusUnauthorized)
|
||||
h.httpError(w, "authorization failed", false, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
case BearerAuthentication:
|
||||
|
@ -728,39 +742,39 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo)
|
|||
// Parse and validate the token.
|
||||
token, err := jwt.Parse(creds.Token, keyLookupFn)
|
||||
if err != nil {
|
||||
httpError(w, err.Error(), false, http.StatusUnauthorized)
|
||||
h.httpError(w, err.Error(), false, http.StatusUnauthorized)
|
||||
return
|
||||
} else if !token.Valid {
|
||||
httpError(w, "invalid token", false, http.StatusUnauthorized)
|
||||
h.httpError(w, "invalid token", false, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
// Make sure an expiration was set on the token.
|
||||
if exp, ok := token.Claims["exp"].(float64); !ok || exp <= 0.0 {
|
||||
httpError(w, "token expiration required", false, http.StatusUnauthorized)
|
||||
h.httpError(w, "token expiration required", false, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
// Get the username from the token.
|
||||
username, ok := token.Claims["username"].(string)
|
||||
if !ok {
|
||||
httpError(w, "username in token must be a string", false, http.StatusUnauthorized)
|
||||
h.httpError(w, "username in token must be a string", false, http.StatusUnauthorized)
|
||||
return
|
||||
} else if username == "" {
|
||||
httpError(w, "token must contain a username", false, http.StatusUnauthorized)
|
||||
h.httpError(w, "token must contain a username", false, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
// Lookup user in the metastore.
|
||||
if user, err = h.MetaClient.User(username); err != nil {
|
||||
httpError(w, err.Error(), false, http.StatusUnauthorized)
|
||||
h.httpError(w, err.Error(), false, http.StatusUnauthorized)
|
||||
return
|
||||
} else if user == nil {
|
||||
httpError(w, meta.ErrUserNotFound.Error(), false, http.StatusUnauthorized)
|
||||
h.httpError(w, meta.ErrUserNotFound.Error(), false, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
default:
|
||||
httpError(w, "unsupported authentication", false, http.StatusUnauthorized)
|
||||
h.httpError(w, "unsupported authentication", false, http.StatusUnauthorized)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -32,6 +32,9 @@ const (
|
|||
statQueryRequestDuration = "queryReqDurationNs" // Number of (wall-time) nanoseconds spent inside query requests
|
||||
statWriteRequestDuration = "writeReqDurationNs" // Number of (wall-time) nanoseconds spent inside write requests
|
||||
statRequestsActive = "reqActive" // Number of currently active requests
|
||||
statWriteRequestsActive = "writeReqActive" // Number of currently active write requests
|
||||
statClientError = "clientError" // Number of HTTP responses due to client error
|
||||
statServerError = "serverError" // Number of HTTP responses due to server error
|
||||
)
|
||||
|
||||
// Service manages the listener and handler for an HTTP endpoint.
|
||||
|
|
Loading…
Reference in New Issue