From badb2bf057a57ddf06a1f1f4d8c5619fb8d50663 Mon Sep 17 00:00:00 2001
From: Jason Wilder <jason@influxdb.com>
Date: Mon, 1 Jun 2015 11:15:46 -0600
Subject: [PATCH] Use content negotiation for /write API

If content-type is "application/json", we'll process the request as
of old JSON write API.  Otherwise, we assume line protocol but check
the first byte in case a older client is still sending JSON without the
correct content-type header.
---
 services/httpd/handler.go | 110 ++++++++++++++++----------------------
 1 file changed, 47 insertions(+), 63 deletions(-)

diff --git a/services/httpd/handler.go b/services/httpd/handler.go
index 0838deaa22..35e44492cc 100644
--- a/services/httpd/handler.go
+++ b/services/httpd/handler.go
@@ -1,6 +1,7 @@
 package httpd
 
 import (
+	"bytes"
 	"compress/gzip"
 	"encoding/json"
 	"errors"
@@ -95,10 +96,6 @@ func NewHandler(requireAuthentication, loggingEnabled bool, version string) *Han
 			"write", // Data-ingest route.
 			"POST", "/write", true, true, h.serveWrite,
 		},
-		route{
-			"write_points", // Data-ingest route.
-			"POST", "/write_points", true, true, h.serveWritePoints,
-		},
 		route{ // Ping
 			"ping",
 			"GET", "/ping", true, true, h.servePing,
@@ -261,37 +258,45 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
 	}
 }
 
-// serveWrite receives incoming series data and writes it to the database.
 func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
-	// Check to see if we have a gzip'd post
-	var body io.ReadCloser
+
+	// Handle gzip decoding of the body
+	body := r.Body
 	if r.Header.Get("Content-encoding") == "gzip" {
 		b, err := gzip.NewReader(r.Body)
 		if err != nil {
-			resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
+			h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
 			return
 		}
 		body = b
-		defer r.Body.Close()
-	} else {
-		body = r.Body
+	}
+	defer body.Close()
+
+	b, err := ioutil.ReadAll(body)
+	if err != nil {
+		if h.WriteTrace {
+			h.Logger.Print("write handler unable to read bytes from request body")
+		}
+		h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
+		return
+	}
+	if h.WriteTrace {
+		h.Logger.Printf("write body received by handler: %s", string(b))
 	}
 
+	if r.Header.Get("Content-Type") == "application/json" {
+		h.serveWriteJSON(w, r, b, user)
+		return
+	}
+	h.serveWriteLine(w, r, b, user)
+}
+
+// serveWriteJSON receives incoming series data in JSON and writes it to the database.
+func (h *Handler) serveWriteJSON(w http.ResponseWriter, r *http.Request, body []byte, user *meta.UserInfo) {
 	var bp client.BatchPoints
 	var dec *json.Decoder
 
-	if h.WriteTrace {
-		b, err := ioutil.ReadAll(body)
-		if err != nil {
-			h.Logger.Print("write handler unable to read bytes from request body")
-		} else {
-			h.Logger.Printf("write body received by handler: %s", string(b))
-		}
-		dec = json.NewDecoder(strings.NewReader(string(b)))
-	} else {
-		dec = json.NewDecoder(body)
-		defer body.Close()
-	}
+	dec = json.NewDecoder(bytes.NewReader(body))
 
 	if err := dec.Decode(&bp); err != nil {
 		if err.Error() == "EOF" {
@@ -343,77 +348,56 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
 	w.WriteHeader(http.StatusNoContent)
 }
 
-// serveWritePoints receives incoming series data and writes it to the database.
-func (h *Handler) serveWritePoints(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
-	var writeError = func(result influxql.Result, statusCode int) {
-		w.WriteHeader(statusCode)
-		w.Write([]byte(result.Err.Error()))
-		return
-	}
+func (h *Handler) writeError(w http.ResponseWriter, result influxql.Result, statusCode int) {
+	w.WriteHeader(statusCode)
+	w.Write([]byte(result.Err.Error()))
+}
 
-	// Check to see if we have a gzip'd post
-	var body io.ReadCloser
-	if r.Header.Get("Content-encoding") == "gzip" {
-		b, err := gzip.NewReader(r.Body)
-		if err != nil {
-			writeError(influxql.Result{Err: err}, http.StatusBadRequest)
-			return
-		}
-		body = b
-		defer r.Body.Close()
-	} else {
-		body = r.Body
-		defer r.Body.Close()
-	}
-
-	b, err := ioutil.ReadAll(body)
-	if err != nil {
-		if h.WriteTrace {
-			h.Logger.Print("write handler unable to read bytes from request body")
-		}
-		writeError(influxql.Result{Err: err}, http.StatusBadRequest)
+// serveWriteLine receives incoming series data in line protocol format and writes it to the database.
+func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body []byte, user *meta.UserInfo) {
+	// Some clients may not set the content-type header appropriately and send JSON with a non-json
+	// content-type.  If the body looks JSON, try to handle it as as JSON instead
+	if len(body) > 0 && body[0] == '{' {
+		h.serveWriteJSON(w, r, body, user)
 		return
 	}
-	if h.WriteTrace {
-		h.Logger.Printf("write body received by handler: %s", string(b))
-	}
 
 	precision := r.FormValue("precision")
 	if precision == "" {
 		precision = "n"
 	}
 
-	points, err := tsdb.ParsePointsWithPrecision(b, time.Now().UTC(), precision)
+	points, err := tsdb.ParsePointsWithPrecision(body, time.Now().UTC(), precision)
 	if err != nil {
 		if err.Error() == "EOF" {
 			w.WriteHeader(http.StatusOK)
 			return
 		}
-		writeError(influxql.Result{Err: err}, http.StatusBadRequest)
+		h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
 		return
 	}
 
 	database := r.FormValue("db")
 	if database == "" {
-		writeError(influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
+		h.writeError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
 		return
 	}
 
 	if di, err := h.MetaStore.Database(database); err != nil {
-		resultError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError)
+		h.writeError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError)
 		return
 	} else if di == nil {
-		resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
+		h.writeError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
 		return
 	}
 
 	if h.requireAuthentication && user == nil {
-		resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
+		h.writeError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
 		return
 	}
 
 	if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, database) {
-		resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
+		h.writeError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
 		return
 	}
 
@@ -437,10 +421,10 @@ func (h *Handler) serveWritePoints(w http.ResponseWriter, r *http.Request, user
 		ConsistencyLevel: consistency,
 		Points:           points,
 	}); influxdb.IsClientError(err) {
-		writeError(influxql.Result{Err: err}, http.StatusBadRequest)
+		h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
 		return
 	} else if err != nil {
-		writeError(influxql.Result{Err: err}, http.StatusInternalServerError)
+		h.writeError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
 		return
 	}