Remove deprecated JSON write path
parent
0703b85589
commit
d96eef4c52
|
@ -341,60 +341,6 @@ func TestServer_UserCommands(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure the server rejects a single point via json protocol by default.
|
||||
func TestServer_Write_JSON(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := OpenServer(NewConfig())
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify writing JSON points returns an error.
|
||||
now := now()
|
||||
res, err := s.Write("", "", fmt.Sprintf(`{"database" : "db0", "retentionPolicy" : "rp0", "points": [{"measurement": "cpu", "tags": {"host": "server02"},"fields": {"value": 1.0}}],"time":"%s"} `, now.Format(time.RFC3339Nano)), nil)
|
||||
if err == nil {
|
||||
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", ``, res)
|
||||
} else if exp := `JSON write protocol has been deprecated`; !strings.Contains(err.Error(), exp) {
|
||||
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, err.Error())
|
||||
}
|
||||
|
||||
// Verify no data has been written.
|
||||
if res, err := s.Query(`SELECT * FROM db0.rp0.cpu GROUP BY *`); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if exp := `{"results":[{}]}`; exp != res {
|
||||
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the server can create a single point via json protocol and read it back.
|
||||
func TestServer_Write_JSON_Enabled(t *testing.T) {
|
||||
t.Parallel()
|
||||
c := NewConfig()
|
||||
c.HTTPD.JSONWriteEnabled = true
|
||||
s := OpenServer(c)
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := now()
|
||||
if res, err := s.Write("", "", fmt.Sprintf(`{"database" : "db0", "retentionPolicy" : "rp0", "points": [{"measurement": "cpu", "tags": {"host": "server02"},"fields": {"value": 1.0}}],"time":"%s"} `, now.Format(time.RFC3339Nano)), nil); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if exp := ``; exp != res {
|
||||
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
|
||||
}
|
||||
|
||||
// Verify the data was written.
|
||||
if res, err := s.Query(`SELECT * FROM db0.rp0.cpu GROUP BY *`); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if exp := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server02"},"columns":["time","value"],"values":[["%s",1]]}]}]}`, now.Format(time.RFC3339Nano)); exp != res {
|
||||
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the server can create a single point via line protocol with float type and read it back.
|
||||
func TestServer_Write_LineProtocol_Float(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
|
|
@ -13,7 +13,6 @@ type Config struct {
|
|||
PprofEnabled bool `toml:"pprof-enabled"`
|
||||
HTTPSEnabled bool `toml:"https-enabled"`
|
||||
HTTPSCertificate string `toml:"https-certificate"`
|
||||
JSONWriteEnabled bool `toml:"json-write-enabled"`
|
||||
}
|
||||
|
||||
// NewConfig returns a new Config with default settings.
|
||||
|
@ -24,6 +23,5 @@ func NewConfig() Config {
|
|||
LogEnabled: true,
|
||||
HTTPSEnabled: false,
|
||||
HTTPSCertificate: "/etc/ssl/influxdb.pem",
|
||||
JSONWriteEnabled: false,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
|
||||
"github.com/bmizerany/pat"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/client"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/services/continuous_querier"
|
||||
|
@ -73,22 +72,20 @@ type Handler struct {
|
|||
|
||||
ContinuousQuerier continuous_querier.ContinuousQuerier
|
||||
|
||||
Logger *log.Logger
|
||||
loggingEnabled bool // Log every HTTP access.
|
||||
WriteTrace bool // Detailed logging of write path
|
||||
JSONWriteEnabled bool // Allow JSON writes
|
||||
statMap *expvar.Map
|
||||
Logger *log.Logger
|
||||
loggingEnabled bool // Log every HTTP access.
|
||||
WriteTrace bool // Detailed logging of write path
|
||||
statMap *expvar.Map
|
||||
}
|
||||
|
||||
// NewHandler returns a new instance of handler with routes.
|
||||
func NewHandler(requireAuthentication, loggingEnabled, writeTrace, JSONWriteEnabled bool, statMap *expvar.Map) *Handler {
|
||||
func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, statMap *expvar.Map) *Handler {
|
||||
h := &Handler{
|
||||
mux: pat.New(),
|
||||
requireAuthentication: requireAuthentication,
|
||||
Logger: log.New(os.Stderr, "[http] ", log.LstdFlags),
|
||||
loggingEnabled: loggingEnabled,
|
||||
WriteTrace: writeTrace,
|
||||
JSONWriteEnabled: JSONWriteEnabled,
|
||||
statMap: statMap,
|
||||
}
|
||||
|
||||
|
@ -371,12 +368,37 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
}
|
||||
}
|
||||
|
||||
// serveWrite receives incoming series data in line protocol format and writes it to the database.
|
||||
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
|
||||
h.statMap.Add(statWriteRequest, 1)
|
||||
defer func(start time.Time) {
|
||||
h.statMap.Add(statWriteRequestDuration, time.Since(start).Nanoseconds())
|
||||
}(time.Now())
|
||||
|
||||
database := r.FormValue("db")
|
||||
if database == "" {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if di, err := h.MetaClient.Database(database); err != nil {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError)
|
||||
return
|
||||
} else if di == nil {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
if h.requireAuthentication && user == nil {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, database) {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
// Handle gzip decoding of the body
|
||||
body := r.Body
|
||||
if r.Header.Get("Content-encoding") == "gzip" {
|
||||
|
@ -413,105 +435,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
h.Logger.Printf("write body received by handler: %s", buf.Bytes())
|
||||
}
|
||||
|
||||
if r.Header.Get("Content-Type") == "application/json" {
|
||||
h.serveWriteJSON(w, r, buf.Bytes(), user)
|
||||
return
|
||||
}
|
||||
h.serveWriteLine(w, r, buf.Bytes(), user)
|
||||
}
|
||||
|
||||
// serveWriteJSON receives incoming series data in JSON and writes it to the database.
|
||||
func (h *Handler) serveWriteJSON(w http.ResponseWriter, r *http.Request, body []byte, user *meta.UserInfo) {
|
||||
if !h.JSONWriteEnabled {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("JSON write protocol has been deprecated")}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var bp client.BatchPoints
|
||||
var dec *json.Decoder
|
||||
|
||||
dec = json.NewDecoder(bytes.NewReader(body))
|
||||
|
||||
if err := dec.Decode(&bp); err != nil {
|
||||
if err.Error() == "EOF" {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if bp.Database == "" {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if di, err := h.MetaClient.Database(bp.Database); err != nil {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError)
|
||||
return
|
||||
} else if di == nil {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", bp.Database)}, http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
if h.requireAuthentication && user == nil {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", bp.Database)}, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, bp.Database) {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, bp.Database)}, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
points, err := NormalizeBatchPoints(bp)
|
||||
if err != nil {
|
||||
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Convert the json batch struct to a points writer struct
|
||||
if err := h.PointsWriter.WritePoints(bp.Database, bp.RetentionPolicy, models.ConsistencyLevelAny, points); err != nil {
|
||||
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
|
||||
if influxdb.IsClientError(err) {
|
||||
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
} else {
|
||||
resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
|
||||
}
|
||||
return
|
||||
}
|
||||
h.statMap.Add(statPointsWrittenOK, int64(len(points)))
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// serveWriteLine receives incoming series data in line protocol format and writes it to the database.
|
||||
func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body []byte, user *meta.UserInfo) {
|
||||
// Some clients may not set the content-type header appropriately and send JSON with a non-json
|
||||
// content-type. If the body looks JSON, try to handle it as as JSON instead
|
||||
if len(body) > 0 {
|
||||
var i int
|
||||
for {
|
||||
// JSON requests must start w/ an opening bracket
|
||||
if body[i] == '{' {
|
||||
h.serveWriteJSON(w, r, body, user)
|
||||
return
|
||||
}
|
||||
|
||||
// check that the byte is in the standard ascii code range
|
||||
if body[i] > 32 || i >= len(body)-1 {
|
||||
break
|
||||
}
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
precision := r.FormValue("precision")
|
||||
if precision == "" {
|
||||
precision = "n"
|
||||
}
|
||||
|
||||
points, parseError := models.ParsePointsWithPrecision(body, time.Now().UTC(), precision)
|
||||
points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.FormValue("precision"))
|
||||
// Not points parsed correctly so return the error now
|
||||
if parseError != nil && len(points) == 0 {
|
||||
if parseError.Error() == "EOF" {
|
||||
|
@ -522,30 +446,6 @@ func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body []
|
|||
return
|
||||
}
|
||||
|
||||
database := r.FormValue("db")
|
||||
if database == "" {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if di, err := h.MetaClient.Database(database); err != nil {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError)
|
||||
return
|
||||
} else if di == nil {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
if h.requireAuthentication && user == nil {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, database) {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
// Write points.
|
||||
if err := h.PointsWriter.WritePoints(database, r.FormValue("rp"), models.ConsistencyLevelAny, points); influxdb.IsClientError(err) {
|
||||
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
|
||||
|
@ -628,22 +528,6 @@ func MarshalJSON(v interface{}, pretty bool) []byte {
|
|||
return b
|
||||
}
|
||||
|
||||
// Point represents an InfluxDB point.
|
||||
type Point struct {
|
||||
Name string `json:"name"`
|
||||
Time time.Time `json:"time"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Fields map[string]interface{} `json:"fields"`
|
||||
}
|
||||
|
||||
// Batch is a collection of points associated with a database, having a
|
||||
// certain retention policy.
|
||||
type Batch struct {
|
||||
Database string `json:"database"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
Points []Point `json:"points"`
|
||||
}
|
||||
|
||||
// serveExpvar serves registered expvar information over HTTP.
|
||||
func serveExpvar(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
|
@ -907,49 +791,3 @@ func (r *Response) Error() error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NormalizeBatchPoints returns a slice of Points, created by populating individual
|
||||
// points within the batch, which do not have times or tags, with the top-level
|
||||
// values.
|
||||
func NormalizeBatchPoints(bp client.BatchPoints) ([]models.Point, error) {
|
||||
points := []models.Point{}
|
||||
for _, p := range bp.Points {
|
||||
if p.Time.IsZero() {
|
||||
if bp.Time.IsZero() {
|
||||
p.Time = time.Now()
|
||||
} else {
|
||||
p.Time = bp.Time
|
||||
}
|
||||
}
|
||||
if p.Precision == "" && bp.Precision != "" {
|
||||
p.Precision = bp.Precision
|
||||
}
|
||||
p.Time = client.SetPrecision(p.Time, p.Precision)
|
||||
if len(bp.Tags) > 0 {
|
||||
if p.Tags == nil {
|
||||
p.Tags = make(map[string]string)
|
||||
}
|
||||
for k := range bp.Tags {
|
||||
if p.Tags[k] == "" {
|
||||
p.Tags[k] = bp.Tags[k]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if p.Measurement == "" {
|
||||
return points, fmt.Errorf("missing measurement")
|
||||
}
|
||||
|
||||
if len(p.Fields) == 0 {
|
||||
return points, fmt.Errorf("missing fields")
|
||||
}
|
||||
// Need to convert from a client.Point to a influxdb.Point
|
||||
pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time)
|
||||
if err != nil {
|
||||
return points, err
|
||||
}
|
||||
points = append(points, pt)
|
||||
}
|
||||
|
||||
return points, nil
|
||||
}
|
||||
|
|
|
@ -2,136 +2,21 @@ package httpd_test
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/client"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/services/httpd"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
)
|
||||
|
||||
func TestBatchWrite_UnmarshalEpoch(t *testing.T) {
|
||||
now := time.Now().UTC()
|
||||
tests := []struct {
|
||||
name string
|
||||
epoch int64
|
||||
precision string
|
||||
expected time.Time
|
||||
}{
|
||||
{
|
||||
name: "nanoseconds",
|
||||
epoch: now.UnixNano(),
|
||||
precision: "n",
|
||||
expected: now,
|
||||
},
|
||||
{
|
||||
name: "microseconds",
|
||||
epoch: now.Round(time.Microsecond).UnixNano() / int64(time.Microsecond),
|
||||
precision: "u",
|
||||
expected: now.Round(time.Microsecond),
|
||||
},
|
||||
{
|
||||
name: "milliseconds",
|
||||
epoch: now.Round(time.Millisecond).UnixNano() / int64(time.Millisecond),
|
||||
precision: "ms",
|
||||
expected: now.Round(time.Millisecond),
|
||||
},
|
||||
{
|
||||
name: "seconds",
|
||||
epoch: now.Round(time.Second).UnixNano() / int64(time.Second),
|
||||
precision: "s",
|
||||
expected: now.Round(time.Second),
|
||||
},
|
||||
{
|
||||
name: "minutes",
|
||||
epoch: now.Round(time.Minute).UnixNano() / int64(time.Minute),
|
||||
precision: "m",
|
||||
expected: now.Round(time.Minute),
|
||||
},
|
||||
{
|
||||
name: "hours",
|
||||
epoch: now.Round(time.Hour).UnixNano() / int64(time.Hour),
|
||||
precision: "h",
|
||||
expected: now.Round(time.Hour),
|
||||
},
|
||||
{
|
||||
name: "max int64",
|
||||
epoch: 9223372036854775807,
|
||||
precision: "n",
|
||||
expected: time.Unix(0, 9223372036854775807),
|
||||
},
|
||||
{
|
||||
name: "100 years from now",
|
||||
epoch: now.Add(time.Hour * 24 * 365 * 100).UnixNano(),
|
||||
precision: "n",
|
||||
expected: now.Add(time.Hour * 24 * 365 * 100),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Logf("testing %q\n", test.name)
|
||||
data := []byte(fmt.Sprintf(`{"time": %d, "precision":"%s"}`, test.epoch, test.precision))
|
||||
t.Logf("json: %s", string(data))
|
||||
var bp client.BatchPoints
|
||||
err := json.Unmarshal(data, &bp)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. expected: %v, actual: %v", nil, err)
|
||||
}
|
||||
if !bp.Time.Equal(test.expected) {
|
||||
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, bp.Time)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchWrite_UnmarshalRFC(t *testing.T) {
|
||||
now := time.Now()
|
||||
tests := []struct {
|
||||
name string
|
||||
rfc string
|
||||
now time.Time
|
||||
expected time.Time
|
||||
}{
|
||||
{
|
||||
name: "RFC3339Nano",
|
||||
rfc: time.RFC3339Nano,
|
||||
now: now,
|
||||
expected: now,
|
||||
},
|
||||
{
|
||||
name: "RFC3339",
|
||||
rfc: time.RFC3339,
|
||||
now: now.Round(time.Second),
|
||||
expected: now.Round(time.Second),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Logf("testing %q\n", test.name)
|
||||
ts := test.now.Format(test.rfc)
|
||||
data := []byte(fmt.Sprintf(`{"time": %q}`, ts))
|
||||
t.Logf("json: %s", string(data))
|
||||
var bp client.BatchPoints
|
||||
err := json.Unmarshal(data, &bp)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
|
||||
}
|
||||
if !bp.Time.Equal(test.expected) {
|
||||
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, bp.Time)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the handler returns results from a query (including nil results).
|
||||
func TestHandler_Query(t *testing.T) {
|
||||
h := NewHandler(false)
|
||||
|
@ -391,71 +276,6 @@ type invalidJSON struct{}
|
|||
|
||||
func (*invalidJSON) MarshalJSON() ([]byte, error) { return nil, errors.New("marker") }
|
||||
|
||||
func TestNormalizeBatchPoints(t *testing.T) {
|
||||
now := time.Now()
|
||||
tests := []struct {
|
||||
name string
|
||||
bp client.BatchPoints
|
||||
p []models.Point
|
||||
err string
|
||||
}{
|
||||
{
|
||||
name: "default",
|
||||
bp: client.BatchPoints{
|
||||
Points: []client.Point{
|
||||
{Measurement: "cpu", Tags: map[string]string{"region": "useast"}, Time: now, Fields: map[string]interface{}{"value": 1.0}},
|
||||
},
|
||||
},
|
||||
p: []models.Point{
|
||||
models.MustNewPoint("cpu", map[string]string{"region": "useast"}, map[string]interface{}{"value": 1.0}, now),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "merge time",
|
||||
bp: client.BatchPoints{
|
||||
Time: now,
|
||||
Points: []client.Point{
|
||||
{Measurement: "cpu", Tags: map[string]string{"region": "useast"}, Fields: map[string]interface{}{"value": 1.0}},
|
||||
},
|
||||
},
|
||||
p: []models.Point{
|
||||
models.MustNewPoint("cpu", map[string]string{"region": "useast"}, map[string]interface{}{"value": 1.0}, now),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "merge tags",
|
||||
bp: client.BatchPoints{
|
||||
Tags: map[string]string{"day": "monday"},
|
||||
Points: []client.Point{
|
||||
{Measurement: "cpu", Tags: map[string]string{"region": "useast"}, Time: now, Fields: map[string]interface{}{"value": 1.0}},
|
||||
{Measurement: "memory", Time: now, Fields: map[string]interface{}{"value": 2.0}},
|
||||
},
|
||||
},
|
||||
p: []models.Point{
|
||||
models.MustNewPoint("cpu", map[string]string{"day": "monday", "region": "useast"}, map[string]interface{}{"value": 1.0}, now),
|
||||
models.MustNewPoint("memory", map[string]string{"day": "monday"}, map[string]interface{}{"value": 2.0}, now),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Logf("running test %q", test.name)
|
||||
p, e := httpd.NormalizeBatchPoints(test.bp)
|
||||
if test.err == "" && e != nil {
|
||||
t.Errorf("unexpected error %v", e)
|
||||
} else if test.err != "" && e == nil {
|
||||
t.Errorf("expected error %s, got <nil>", test.err)
|
||||
} else if e != nil && test.err != e.Error() {
|
||||
t.Errorf("unexpected error. expected: %s, got %v", test.err, e)
|
||||
}
|
||||
if !reflect.DeepEqual(p, test.p) {
|
||||
t.Logf("expected: %+v", test.p)
|
||||
t.Logf("got: %+v", p)
|
||||
t.Error("failed to normalize.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewHandler represents a test wrapper for httpd.Handler.
|
||||
type Handler struct {
|
||||
*httpd.Handler
|
||||
|
@ -467,7 +287,7 @@ type Handler struct {
|
|||
func NewHandler(requireAuthentication bool) *Handler {
|
||||
statMap := influxdb.NewStatistics("httpd", "httpd", nil)
|
||||
h := &Handler{
|
||||
Handler: httpd.NewHandler(requireAuthentication, true, false, false, statMap),
|
||||
Handler: httpd.NewHandler(requireAuthentication, true, false, statMap),
|
||||
}
|
||||
h.Handler.MetaClient = &h.MetaClient
|
||||
h.Handler.QueryExecutor = &h.QueryExecutor
|
||||
|
|
|
@ -64,7 +64,6 @@ func NewService(c Config) *Service {
|
|||
c.AuthEnabled,
|
||||
c.LogEnabled,
|
||||
c.WriteTracing,
|
||||
c.JSONWriteEnabled,
|
||||
statMap,
|
||||
),
|
||||
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
|
||||
|
|
Loading…
Reference in New Issue