diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 0838deaa22..35e44492cc 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -1,6 +1,7 @@ package httpd import ( + "bytes" "compress/gzip" "encoding/json" "errors" @@ -95,10 +96,6 @@ func NewHandler(requireAuthentication, loggingEnabled bool, version string) *Han "write", // Data-ingest route. "POST", "/write", true, true, h.serveWrite, }, - route{ - "write_points", // Data-ingest route. - "POST", "/write_points", true, true, h.serveWritePoints, - }, route{ // Ping "ping", "GET", "/ping", true, true, h.servePing, @@ -261,37 +258,45 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. } } -// serveWrite receives incoming series data and writes it to the database. func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) { - // Check to see if we have a gzip'd post - var body io.ReadCloser + + // Handle gzip decoding of the body + body := r.Body if r.Header.Get("Content-encoding") == "gzip" { b, err := gzip.NewReader(r.Body) if err != nil { - resultError(w, influxql.Result{Err: err}, http.StatusBadRequest) + h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest) return } body = b - defer r.Body.Close() - } else { - body = r.Body + } + defer body.Close() + + b, err := ioutil.ReadAll(body) + if err != nil { + if h.WriteTrace { + h.Logger.Print("write handler unable to read bytes from request body") + } + h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest) + return + } + if h.WriteTrace { + h.Logger.Printf("write body received by handler: %s", string(b)) } + if r.Header.Get("Content-Type") == "application/json" { + h.serveWriteJSON(w, r, b, user) + return + } + h.serveWriteLine(w, r, b, user) +} + +// serveWriteJSON receives incoming series data in JSON and writes it to the database. +func (h *Handler) serveWriteJSON(w http.ResponseWriter, r *http.Request, body []byte, user *meta.UserInfo) { var bp client.BatchPoints var dec *json.Decoder - if h.WriteTrace { - b, err := ioutil.ReadAll(body) - if err != nil { - h.Logger.Print("write handler unable to read bytes from request body") - } else { - h.Logger.Printf("write body received by handler: %s", string(b)) - } - dec = json.NewDecoder(strings.NewReader(string(b))) - } else { - dec = json.NewDecoder(body) - defer body.Close() - } + dec = json.NewDecoder(bytes.NewReader(body)) if err := dec.Decode(&bp); err != nil { if err.Error() == "EOF" { @@ -343,77 +348,56 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta. w.WriteHeader(http.StatusNoContent) } -// serveWritePoints receives incoming series data and writes it to the database. -func (h *Handler) serveWritePoints(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) { - var writeError = func(result influxql.Result, statusCode int) { - w.WriteHeader(statusCode) - w.Write([]byte(result.Err.Error())) - return - } +func (h *Handler) writeError(w http.ResponseWriter, result influxql.Result, statusCode int) { + w.WriteHeader(statusCode) + w.Write([]byte(result.Err.Error())) +} - // Check to see if we have a gzip'd post - var body io.ReadCloser - if r.Header.Get("Content-encoding") == "gzip" { - b, err := gzip.NewReader(r.Body) - if err != nil { - writeError(influxql.Result{Err: err}, http.StatusBadRequest) - return - } - body = b - defer r.Body.Close() - } else { - body = r.Body - defer r.Body.Close() - } - - b, err := ioutil.ReadAll(body) - if err != nil { - if h.WriteTrace { - h.Logger.Print("write handler unable to read bytes from request body") - } - writeError(influxql.Result{Err: err}, http.StatusBadRequest) +// serveWriteLine receives incoming series data in line protocol format and writes it to the database. +func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body []byte, user *meta.UserInfo) { + // Some clients may not set the content-type header appropriately and send JSON with a non-json + // content-type. If the body looks JSON, try to handle it as as JSON instead + if len(body) > 0 && body[0] == '{' { + h.serveWriteJSON(w, r, body, user) return } - if h.WriteTrace { - h.Logger.Printf("write body received by handler: %s", string(b)) - } precision := r.FormValue("precision") if precision == "" { precision = "n" } - points, err := tsdb.ParsePointsWithPrecision(b, time.Now().UTC(), precision) + points, err := tsdb.ParsePointsWithPrecision(body, time.Now().UTC(), precision) if err != nil { if err.Error() == "EOF" { w.WriteHeader(http.StatusOK) return } - writeError(influxql.Result{Err: err}, http.StatusBadRequest) + h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest) return } database := r.FormValue("db") if database == "" { - writeError(influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest) + h.writeError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest) return } if di, err := h.MetaStore.Database(database); err != nil { - resultError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError) + h.writeError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError) return } else if di == nil { - resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound) + h.writeError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound) return } if h.requireAuthentication && user == nil { - resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized) + h.writeError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized) return } if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, database) { - resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized) + h.writeError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized) return } @@ -437,10 +421,10 @@ func (h *Handler) serveWritePoints(w http.ResponseWriter, r *http.Request, user ConsistencyLevel: consistency, Points: points, }); influxdb.IsClientError(err) { - writeError(influxql.Result{Err: err}, http.StatusBadRequest) + h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest) return } else if err != nil { - writeError(influxql.Result{Err: err}, http.StatusInternalServerError) + h.writeError(w, influxql.Result{Err: err}, http.StatusInternalServerError) return }