Use content negotiation for /write API

If content-type is "application/json", we'll process the request as
of old JSON write API.  Otherwise, we assume line protocol but check
the first byte in case a older client is still sending JSON without the
correct content-type header.
pull/2721/head
Jason Wilder 2015-06-01 11:15:46 -06:00
parent 50be500777
commit badb2bf057
1 changed files with 47 additions and 63 deletions

View File

@ -1,6 +1,7 @@
package httpd
import (
"bytes"
"compress/gzip"
"encoding/json"
"errors"
@ -95,10 +96,6 @@ func NewHandler(requireAuthentication, loggingEnabled bool, version string) *Han
"write", // Data-ingest route.
"POST", "/write", true, true, h.serveWrite,
},
route{
"write_points", // Data-ingest route.
"POST", "/write_points", true, true, h.serveWritePoints,
},
route{ // Ping
"ping",
"GET", "/ping", true, true, h.servePing,
@ -261,37 +258,45 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
}
}
// serveWrite receives incoming series data and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
// Check to see if we have a gzip'd post
var body io.ReadCloser
// Handle gzip decoding of the body
body := r.Body
if r.Header.Get("Content-encoding") == "gzip" {
b, err := gzip.NewReader(r.Body)
if err != nil {
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return
}
body = b
defer r.Body.Close()
} else {
body = r.Body
}
defer body.Close()
b, err := ioutil.ReadAll(body)
if err != nil {
if h.WriteTrace {
h.Logger.Print("write handler unable to read bytes from request body")
}
h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return
}
if h.WriteTrace {
h.Logger.Printf("write body received by handler: %s", string(b))
}
if r.Header.Get("Content-Type") == "application/json" {
h.serveWriteJSON(w, r, b, user)
return
}
h.serveWriteLine(w, r, b, user)
}
// serveWriteJSON receives incoming series data in JSON and writes it to the database.
func (h *Handler) serveWriteJSON(w http.ResponseWriter, r *http.Request, body []byte, user *meta.UserInfo) {
var bp client.BatchPoints
var dec *json.Decoder
if h.WriteTrace {
b, err := ioutil.ReadAll(body)
if err != nil {
h.Logger.Print("write handler unable to read bytes from request body")
} else {
h.Logger.Printf("write body received by handler: %s", string(b))
}
dec = json.NewDecoder(strings.NewReader(string(b)))
} else {
dec = json.NewDecoder(body)
defer body.Close()
}
dec = json.NewDecoder(bytes.NewReader(body))
if err := dec.Decode(&bp); err != nil {
if err.Error() == "EOF" {
@ -343,77 +348,56 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
w.WriteHeader(http.StatusNoContent)
}
// serveWritePoints receives incoming series data and writes it to the database.
func (h *Handler) serveWritePoints(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
var writeError = func(result influxql.Result, statusCode int) {
w.WriteHeader(statusCode)
w.Write([]byte(result.Err.Error()))
return
}
func (h *Handler) writeError(w http.ResponseWriter, result influxql.Result, statusCode int) {
w.WriteHeader(statusCode)
w.Write([]byte(result.Err.Error()))
}
// Check to see if we have a gzip'd post
var body io.ReadCloser
if r.Header.Get("Content-encoding") == "gzip" {
b, err := gzip.NewReader(r.Body)
if err != nil {
writeError(influxql.Result{Err: err}, http.StatusBadRequest)
return
}
body = b
defer r.Body.Close()
} else {
body = r.Body
defer r.Body.Close()
}
b, err := ioutil.ReadAll(body)
if err != nil {
if h.WriteTrace {
h.Logger.Print("write handler unable to read bytes from request body")
}
writeError(influxql.Result{Err: err}, http.StatusBadRequest)
// serveWriteLine receives incoming series data in line protocol format and writes it to the database.
func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body []byte, user *meta.UserInfo) {
// Some clients may not set the content-type header appropriately and send JSON with a non-json
// content-type. If the body looks JSON, try to handle it as as JSON instead
if len(body) > 0 && body[0] == '{' {
h.serveWriteJSON(w, r, body, user)
return
}
if h.WriteTrace {
h.Logger.Printf("write body received by handler: %s", string(b))
}
precision := r.FormValue("precision")
if precision == "" {
precision = "n"
}
points, err := tsdb.ParsePointsWithPrecision(b, time.Now().UTC(), precision)
points, err := tsdb.ParsePointsWithPrecision(body, time.Now().UTC(), precision)
if err != nil {
if err.Error() == "EOF" {
w.WriteHeader(http.StatusOK)
return
}
writeError(influxql.Result{Err: err}, http.StatusBadRequest)
h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return
}
database := r.FormValue("db")
if database == "" {
writeError(influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
h.writeError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
return
}
if di, err := h.MetaStore.Database(database); err != nil {
resultError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError)
h.writeError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError)
return
} else if di == nil {
resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
h.writeError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
return
}
if h.requireAuthentication && user == nil {
resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
h.writeError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
return
}
if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, database) {
resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
h.writeError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
return
}
@ -437,10 +421,10 @@ func (h *Handler) serveWritePoints(w http.ResponseWriter, r *http.Request, user
ConsistencyLevel: consistency,
Points: points,
}); influxdb.IsClientError(err) {
writeError(influxql.Result{Err: err}, http.StatusBadRequest)
h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return
} else if err != nil {
writeError(influxql.Result{Err: err}, http.StatusInternalServerError)
h.writeError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
return
}