Remove deprecated JSON write path

pull/6025/head
gunnaraasen 2016-03-15 19:52:41 -07:00
parent 0703b85589
commit d96eef4c52
5 changed files with 32 additions and 431 deletions

View File

@ -341,60 +341,6 @@ func TestServer_UserCommands(t *testing.T) {
}
}
// Ensure the server rejects a single point via json protocol by default.
func TestServer_Write_JSON(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
t.Fatal(err)
}
// Verify writing JSON points returns an error.
now := now()
res, err := s.Write("", "", fmt.Sprintf(`{"database" : "db0", "retentionPolicy" : "rp0", "points": [{"measurement": "cpu", "tags": {"host": "server02"},"fields": {"value": 1.0}}],"time":"%s"} `, now.Format(time.RFC3339Nano)), nil)
if err == nil {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", ``, res)
} else if exp := `JSON write protocol has been deprecated`; !strings.Contains(err.Error(), exp) {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, err.Error())
}
// Verify no data has been written.
if res, err := s.Query(`SELECT * FROM db0.rp0.cpu GROUP BY *`); err != nil {
t.Fatal(err)
} else if exp := `{"results":[{}]}`; exp != res {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
}
}
// Ensure the server can create a single point via json protocol and read it back.
func TestServer_Write_JSON_Enabled(t *testing.T) {
t.Parallel()
c := NewConfig()
c.HTTPD.JSONWriteEnabled = true
s := OpenServer(c)
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
t.Fatal(err)
}
now := now()
if res, err := s.Write("", "", fmt.Sprintf(`{"database" : "db0", "retentionPolicy" : "rp0", "points": [{"measurement": "cpu", "tags": {"host": "server02"},"fields": {"value": 1.0}}],"time":"%s"} `, now.Format(time.RFC3339Nano)), nil); err != nil {
t.Fatal(err)
} else if exp := ``; exp != res {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
}
// Verify the data was written.
if res, err := s.Query(`SELECT * FROM db0.rp0.cpu GROUP BY *`); err != nil {
t.Fatal(err)
} else if exp := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server02"},"columns":["time","value"],"values":[["%s",1]]}]}]}`, now.Format(time.RFC3339Nano)); exp != res {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
}
}
// Ensure the server can create a single point via line protocol with float type and read it back.
func TestServer_Write_LineProtocol_Float(t *testing.T) {
t.Parallel()

View File

@ -13,7 +13,6 @@ type Config struct {
PprofEnabled bool `toml:"pprof-enabled"`
HTTPSEnabled bool `toml:"https-enabled"`
HTTPSCertificate string `toml:"https-certificate"`
JSONWriteEnabled bool `toml:"json-write-enabled"`
}
// NewConfig returns a new Config with default settings.
@ -24,6 +23,5 @@ func NewConfig() Config {
LogEnabled: true,
HTTPSEnabled: false,
HTTPSCertificate: "/etc/ssl/influxdb.pem",
JSONWriteEnabled: false,
}
}

View File

@ -18,7 +18,6 @@ import (
"github.com/bmizerany/pat"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/client"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/continuous_querier"
@ -73,22 +72,20 @@ type Handler struct {
ContinuousQuerier continuous_querier.ContinuousQuerier
Logger *log.Logger
loggingEnabled bool // Log every HTTP access.
WriteTrace bool // Detailed logging of write path
JSONWriteEnabled bool // Allow JSON writes
statMap *expvar.Map
Logger *log.Logger
loggingEnabled bool // Log every HTTP access.
WriteTrace bool // Detailed logging of write path
statMap *expvar.Map
}
// NewHandler returns a new instance of handler with routes.
func NewHandler(requireAuthentication, loggingEnabled, writeTrace, JSONWriteEnabled bool, statMap *expvar.Map) *Handler {
func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, statMap *expvar.Map) *Handler {
h := &Handler{
mux: pat.New(),
requireAuthentication: requireAuthentication,
Logger: log.New(os.Stderr, "[http] ", log.LstdFlags),
loggingEnabled: loggingEnabled,
WriteTrace: writeTrace,
JSONWriteEnabled: JSONWriteEnabled,
statMap: statMap,
}
@ -371,12 +368,37 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
}
}
// serveWrite receives incoming series data in line protocol format and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
h.statMap.Add(statWriteRequest, 1)
defer func(start time.Time) {
h.statMap.Add(statWriteRequestDuration, time.Since(start).Nanoseconds())
}(time.Now())
database := r.FormValue("db")
if database == "" {
resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
return
}
if di, err := h.MetaClient.Database(database); err != nil {
resultError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError)
return
} else if di == nil {
resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
return
}
if h.requireAuthentication && user == nil {
resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
return
}
if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, database) {
resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
return
}
// Handle gzip decoding of the body
body := r.Body
if r.Header.Get("Content-encoding") == "gzip" {
@ -413,105 +435,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
h.Logger.Printf("write body received by handler: %s", buf.Bytes())
}
if r.Header.Get("Content-Type") == "application/json" {
h.serveWriteJSON(w, r, buf.Bytes(), user)
return
}
h.serveWriteLine(w, r, buf.Bytes(), user)
}
// serveWriteJSON receives incoming series data in JSON and writes it to the database.
func (h *Handler) serveWriteJSON(w http.ResponseWriter, r *http.Request, body []byte, user *meta.UserInfo) {
if !h.JSONWriteEnabled {
resultError(w, influxql.Result{Err: fmt.Errorf("JSON write protocol has been deprecated")}, http.StatusBadRequest)
return
}
var bp client.BatchPoints
var dec *json.Decoder
dec = json.NewDecoder(bytes.NewReader(body))
if err := dec.Decode(&bp); err != nil {
if err.Error() == "EOF" {
w.WriteHeader(http.StatusOK)
return
}
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return
}
if bp.Database == "" {
resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
return
}
if di, err := h.MetaClient.Database(bp.Database); err != nil {
resultError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError)
return
} else if di == nil {
resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", bp.Database)}, http.StatusNotFound)
return
}
if h.requireAuthentication && user == nil {
resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", bp.Database)}, http.StatusUnauthorized)
return
}
if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, bp.Database) {
resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, bp.Database)}, http.StatusUnauthorized)
return
}
points, err := NormalizeBatchPoints(bp)
if err != nil {
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return
}
// Convert the json batch struct to a points writer struct
if err := h.PointsWriter.WritePoints(bp.Database, bp.RetentionPolicy, models.ConsistencyLevelAny, points); err != nil {
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
if influxdb.IsClientError(err) {
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
} else {
resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
}
return
}
h.statMap.Add(statPointsWrittenOK, int64(len(points)))
w.WriteHeader(http.StatusNoContent)
}
// serveWriteLine receives incoming series data in line protocol format and writes it to the database.
func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body []byte, user *meta.UserInfo) {
// Some clients may not set the content-type header appropriately and send JSON with a non-json
// content-type. If the body looks JSON, try to handle it as as JSON instead
if len(body) > 0 {
var i int
for {
// JSON requests must start w/ an opening bracket
if body[i] == '{' {
h.serveWriteJSON(w, r, body, user)
return
}
// check that the byte is in the standard ascii code range
if body[i] > 32 || i >= len(body)-1 {
break
}
i++
}
}
precision := r.FormValue("precision")
if precision == "" {
precision = "n"
}
points, parseError := models.ParsePointsWithPrecision(body, time.Now().UTC(), precision)
points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.FormValue("precision"))
// Not points parsed correctly so return the error now
if parseError != nil && len(points) == 0 {
if parseError.Error() == "EOF" {
@ -522,30 +446,6 @@ func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body []
return
}
database := r.FormValue("db")
if database == "" {
resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
return
}
if di, err := h.MetaClient.Database(database); err != nil {
resultError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError)
return
} else if di == nil {
resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
return
}
if h.requireAuthentication && user == nil {
resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
return
}
if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, database) {
resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
return
}
// Write points.
if err := h.PointsWriter.WritePoints(database, r.FormValue("rp"), models.ConsistencyLevelAny, points); influxdb.IsClientError(err) {
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
@ -628,22 +528,6 @@ func MarshalJSON(v interface{}, pretty bool) []byte {
return b
}
// Point represents an InfluxDB point.
type Point struct {
Name string `json:"name"`
Time time.Time `json:"time"`
Tags map[string]string `json:"tags"`
Fields map[string]interface{} `json:"fields"`
}
// Batch is a collection of points associated with a database, having a
// certain retention policy.
type Batch struct {
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Points []Point `json:"points"`
}
// serveExpvar serves registered expvar information over HTTP.
func serveExpvar(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
@ -907,49 +791,3 @@ func (r *Response) Error() error {
}
return nil
}
// NormalizeBatchPoints returns a slice of Points, created by populating individual
// points within the batch, which do not have times or tags, with the top-level
// values.
func NormalizeBatchPoints(bp client.BatchPoints) ([]models.Point, error) {
points := []models.Point{}
for _, p := range bp.Points {
if p.Time.IsZero() {
if bp.Time.IsZero() {
p.Time = time.Now()
} else {
p.Time = bp.Time
}
}
if p.Precision == "" && bp.Precision != "" {
p.Precision = bp.Precision
}
p.Time = client.SetPrecision(p.Time, p.Precision)
if len(bp.Tags) > 0 {
if p.Tags == nil {
p.Tags = make(map[string]string)
}
for k := range bp.Tags {
if p.Tags[k] == "" {
p.Tags[k] = bp.Tags[k]
}
}
}
if p.Measurement == "" {
return points, fmt.Errorf("missing measurement")
}
if len(p.Fields) == 0 {
return points, fmt.Errorf("missing fields")
}
// Need to convert from a client.Point to a influxdb.Point
pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time)
if err != nil {
return points, err
}
points = append(points, pt)
}
return points, nil
}

View File

@ -2,136 +2,21 @@ package httpd_test
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"reflect"
"regexp"
"testing"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/client"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/httpd"
"github.com/influxdata/influxdb/services/meta"
)
func TestBatchWrite_UnmarshalEpoch(t *testing.T) {
now := time.Now().UTC()
tests := []struct {
name string
epoch int64
precision string
expected time.Time
}{
{
name: "nanoseconds",
epoch: now.UnixNano(),
precision: "n",
expected: now,
},
{
name: "microseconds",
epoch: now.Round(time.Microsecond).UnixNano() / int64(time.Microsecond),
precision: "u",
expected: now.Round(time.Microsecond),
},
{
name: "milliseconds",
epoch: now.Round(time.Millisecond).UnixNano() / int64(time.Millisecond),
precision: "ms",
expected: now.Round(time.Millisecond),
},
{
name: "seconds",
epoch: now.Round(time.Second).UnixNano() / int64(time.Second),
precision: "s",
expected: now.Round(time.Second),
},
{
name: "minutes",
epoch: now.Round(time.Minute).UnixNano() / int64(time.Minute),
precision: "m",
expected: now.Round(time.Minute),
},
{
name: "hours",
epoch: now.Round(time.Hour).UnixNano() / int64(time.Hour),
precision: "h",
expected: now.Round(time.Hour),
},
{
name: "max int64",
epoch: 9223372036854775807,
precision: "n",
expected: time.Unix(0, 9223372036854775807),
},
{
name: "100 years from now",
epoch: now.Add(time.Hour * 24 * 365 * 100).UnixNano(),
precision: "n",
expected: now.Add(time.Hour * 24 * 365 * 100),
},
}
for _, test := range tests {
t.Logf("testing %q\n", test.name)
data := []byte(fmt.Sprintf(`{"time": %d, "precision":"%s"}`, test.epoch, test.precision))
t.Logf("json: %s", string(data))
var bp client.BatchPoints
err := json.Unmarshal(data, &bp)
if err != nil {
t.Fatalf("unexpected error. expected: %v, actual: %v", nil, err)
}
if !bp.Time.Equal(test.expected) {
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, bp.Time)
}
}
}
func TestBatchWrite_UnmarshalRFC(t *testing.T) {
now := time.Now()
tests := []struct {
name string
rfc string
now time.Time
expected time.Time
}{
{
name: "RFC3339Nano",
rfc: time.RFC3339Nano,
now: now,
expected: now,
},
{
name: "RFC3339",
rfc: time.RFC3339,
now: now.Round(time.Second),
expected: now.Round(time.Second),
},
}
for _, test := range tests {
t.Logf("testing %q\n", test.name)
ts := test.now.Format(test.rfc)
data := []byte(fmt.Sprintf(`{"time": %q}`, ts))
t.Logf("json: %s", string(data))
var bp client.BatchPoints
err := json.Unmarshal(data, &bp)
if err != nil {
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
}
if !bp.Time.Equal(test.expected) {
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, bp.Time)
}
}
}
// Ensure the handler returns results from a query (including nil results).
func TestHandler_Query(t *testing.T) {
h := NewHandler(false)
@ -391,71 +276,6 @@ type invalidJSON struct{}
func (*invalidJSON) MarshalJSON() ([]byte, error) { return nil, errors.New("marker") }
func TestNormalizeBatchPoints(t *testing.T) {
now := time.Now()
tests := []struct {
name string
bp client.BatchPoints
p []models.Point
err string
}{
{
name: "default",
bp: client.BatchPoints{
Points: []client.Point{
{Measurement: "cpu", Tags: map[string]string{"region": "useast"}, Time: now, Fields: map[string]interface{}{"value": 1.0}},
},
},
p: []models.Point{
models.MustNewPoint("cpu", map[string]string{"region": "useast"}, map[string]interface{}{"value": 1.0}, now),
},
},
{
name: "merge time",
bp: client.BatchPoints{
Time: now,
Points: []client.Point{
{Measurement: "cpu", Tags: map[string]string{"region": "useast"}, Fields: map[string]interface{}{"value": 1.0}},
},
},
p: []models.Point{
models.MustNewPoint("cpu", map[string]string{"region": "useast"}, map[string]interface{}{"value": 1.0}, now),
},
},
{
name: "merge tags",
bp: client.BatchPoints{
Tags: map[string]string{"day": "monday"},
Points: []client.Point{
{Measurement: "cpu", Tags: map[string]string{"region": "useast"}, Time: now, Fields: map[string]interface{}{"value": 1.0}},
{Measurement: "memory", Time: now, Fields: map[string]interface{}{"value": 2.0}},
},
},
p: []models.Point{
models.MustNewPoint("cpu", map[string]string{"day": "monday", "region": "useast"}, map[string]interface{}{"value": 1.0}, now),
models.MustNewPoint("memory", map[string]string{"day": "monday"}, map[string]interface{}{"value": 2.0}, now),
},
},
}
for _, test := range tests {
t.Logf("running test %q", test.name)
p, e := httpd.NormalizeBatchPoints(test.bp)
if test.err == "" && e != nil {
t.Errorf("unexpected error %v", e)
} else if test.err != "" && e == nil {
t.Errorf("expected error %s, got <nil>", test.err)
} else if e != nil && test.err != e.Error() {
t.Errorf("unexpected error. expected: %s, got %v", test.err, e)
}
if !reflect.DeepEqual(p, test.p) {
t.Logf("expected: %+v", test.p)
t.Logf("got: %+v", p)
t.Error("failed to normalize.")
}
}
}
// NewHandler represents a test wrapper for httpd.Handler.
type Handler struct {
*httpd.Handler
@ -467,7 +287,7 @@ type Handler struct {
func NewHandler(requireAuthentication bool) *Handler {
statMap := influxdb.NewStatistics("httpd", "httpd", nil)
h := &Handler{
Handler: httpd.NewHandler(requireAuthentication, true, false, false, statMap),
Handler: httpd.NewHandler(requireAuthentication, true, false, statMap),
}
h.Handler.MetaClient = &h.MetaClient
h.Handler.QueryExecutor = &h.QueryExecutor

View File

@ -64,7 +64,6 @@ func NewService(c Config) *Service {
c.AuthEnabled,
c.LogEnabled,
c.WriteTracing,
c.JSONWriteEnabled,
statMap,
),
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),