Remove deprecated JSON write path
parent
0703b85589
commit
d96eef4c52
|
@ -341,60 +341,6 @@ func TestServer_UserCommands(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure the server rejects a single point via json protocol by default.
|
|
||||||
func TestServer_Write_JSON(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
s := OpenServer(NewConfig())
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify writing JSON points returns an error.
|
|
||||||
now := now()
|
|
||||||
res, err := s.Write("", "", fmt.Sprintf(`{"database" : "db0", "retentionPolicy" : "rp0", "points": [{"measurement": "cpu", "tags": {"host": "server02"},"fields": {"value": 1.0}}],"time":"%s"} `, now.Format(time.RFC3339Nano)), nil)
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", ``, res)
|
|
||||||
} else if exp := `JSON write protocol has been deprecated`; !strings.Contains(err.Error(), exp) {
|
|
||||||
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify no data has been written.
|
|
||||||
if res, err := s.Query(`SELECT * FROM db0.rp0.cpu GROUP BY *`); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
} else if exp := `{"results":[{}]}`; exp != res {
|
|
||||||
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure the server can create a single point via json protocol and read it back.
|
|
||||||
func TestServer_Write_JSON_Enabled(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
c := NewConfig()
|
|
||||||
c.HTTPD.JSONWriteEnabled = true
|
|
||||||
s := OpenServer(c)
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
now := now()
|
|
||||||
if res, err := s.Write("", "", fmt.Sprintf(`{"database" : "db0", "retentionPolicy" : "rp0", "points": [{"measurement": "cpu", "tags": {"host": "server02"},"fields": {"value": 1.0}}],"time":"%s"} `, now.Format(time.RFC3339Nano)), nil); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
} else if exp := ``; exp != res {
|
|
||||||
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify the data was written.
|
|
||||||
if res, err := s.Query(`SELECT * FROM db0.rp0.cpu GROUP BY *`); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
} else if exp := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server02"},"columns":["time","value"],"values":[["%s",1]]}]}]}`, now.Format(time.RFC3339Nano)); exp != res {
|
|
||||||
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure the server can create a single point via line protocol with float type and read it back.
|
// Ensure the server can create a single point via line protocol with float type and read it back.
|
||||||
func TestServer_Write_LineProtocol_Float(t *testing.T) {
|
func TestServer_Write_LineProtocol_Float(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
|
@ -13,7 +13,6 @@ type Config struct {
|
||||||
PprofEnabled bool `toml:"pprof-enabled"`
|
PprofEnabled bool `toml:"pprof-enabled"`
|
||||||
HTTPSEnabled bool `toml:"https-enabled"`
|
HTTPSEnabled bool `toml:"https-enabled"`
|
||||||
HTTPSCertificate string `toml:"https-certificate"`
|
HTTPSCertificate string `toml:"https-certificate"`
|
||||||
JSONWriteEnabled bool `toml:"json-write-enabled"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfig returns a new Config with default settings.
|
// NewConfig returns a new Config with default settings.
|
||||||
|
@ -24,6 +23,5 @@ func NewConfig() Config {
|
||||||
LogEnabled: true,
|
LogEnabled: true,
|
||||||
HTTPSEnabled: false,
|
HTTPSEnabled: false,
|
||||||
HTTPSCertificate: "/etc/ssl/influxdb.pem",
|
HTTPSCertificate: "/etc/ssl/influxdb.pem",
|
||||||
JSONWriteEnabled: false,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@ import (
|
||||||
|
|
||||||
"github.com/bmizerany/pat"
|
"github.com/bmizerany/pat"
|
||||||
"github.com/influxdata/influxdb"
|
"github.com/influxdata/influxdb"
|
||||||
"github.com/influxdata/influxdb/client"
|
|
||||||
"github.com/influxdata/influxdb/influxql"
|
"github.com/influxdata/influxdb/influxql"
|
||||||
"github.com/influxdata/influxdb/models"
|
"github.com/influxdata/influxdb/models"
|
||||||
"github.com/influxdata/influxdb/services/continuous_querier"
|
"github.com/influxdata/influxdb/services/continuous_querier"
|
||||||
|
@ -73,22 +72,20 @@ type Handler struct {
|
||||||
|
|
||||||
ContinuousQuerier continuous_querier.ContinuousQuerier
|
ContinuousQuerier continuous_querier.ContinuousQuerier
|
||||||
|
|
||||||
Logger *log.Logger
|
Logger *log.Logger
|
||||||
loggingEnabled bool // Log every HTTP access.
|
loggingEnabled bool // Log every HTTP access.
|
||||||
WriteTrace bool // Detailed logging of write path
|
WriteTrace bool // Detailed logging of write path
|
||||||
JSONWriteEnabled bool // Allow JSON writes
|
statMap *expvar.Map
|
||||||
statMap *expvar.Map
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHandler returns a new instance of handler with routes.
|
// NewHandler returns a new instance of handler with routes.
|
||||||
func NewHandler(requireAuthentication, loggingEnabled, writeTrace, JSONWriteEnabled bool, statMap *expvar.Map) *Handler {
|
func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, statMap *expvar.Map) *Handler {
|
||||||
h := &Handler{
|
h := &Handler{
|
||||||
mux: pat.New(),
|
mux: pat.New(),
|
||||||
requireAuthentication: requireAuthentication,
|
requireAuthentication: requireAuthentication,
|
||||||
Logger: log.New(os.Stderr, "[http] ", log.LstdFlags),
|
Logger: log.New(os.Stderr, "[http] ", log.LstdFlags),
|
||||||
loggingEnabled: loggingEnabled,
|
loggingEnabled: loggingEnabled,
|
||||||
WriteTrace: writeTrace,
|
WriteTrace: writeTrace,
|
||||||
JSONWriteEnabled: JSONWriteEnabled,
|
|
||||||
statMap: statMap,
|
statMap: statMap,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -371,12 +368,37 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// serveWrite receives incoming series data in line protocol format and writes it to the database.
|
||||||
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
|
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
|
||||||
h.statMap.Add(statWriteRequest, 1)
|
h.statMap.Add(statWriteRequest, 1)
|
||||||
defer func(start time.Time) {
|
defer func(start time.Time) {
|
||||||
h.statMap.Add(statWriteRequestDuration, time.Since(start).Nanoseconds())
|
h.statMap.Add(statWriteRequestDuration, time.Since(start).Nanoseconds())
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
|
database := r.FormValue("db")
|
||||||
|
if database == "" {
|
||||||
|
resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if di, err := h.MetaClient.Database(database); err != nil {
|
||||||
|
resultError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
} else if di == nil {
|
||||||
|
resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if h.requireAuthentication && user == nil {
|
||||||
|
resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, database) {
|
||||||
|
resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Handle gzip decoding of the body
|
// Handle gzip decoding of the body
|
||||||
body := r.Body
|
body := r.Body
|
||||||
if r.Header.Get("Content-encoding") == "gzip" {
|
if r.Header.Get("Content-encoding") == "gzip" {
|
||||||
|
@ -413,105 +435,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
||||||
h.Logger.Printf("write body received by handler: %s", buf.Bytes())
|
h.Logger.Printf("write body received by handler: %s", buf.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.Header.Get("Content-Type") == "application/json" {
|
points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.FormValue("precision"))
|
||||||
h.serveWriteJSON(w, r, buf.Bytes(), user)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.serveWriteLine(w, r, buf.Bytes(), user)
|
|
||||||
}
|
|
||||||
|
|
||||||
// serveWriteJSON receives incoming series data in JSON and writes it to the database.
|
|
||||||
func (h *Handler) serveWriteJSON(w http.ResponseWriter, r *http.Request, body []byte, user *meta.UserInfo) {
|
|
||||||
if !h.JSONWriteEnabled {
|
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("JSON write protocol has been deprecated")}, http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var bp client.BatchPoints
|
|
||||||
var dec *json.Decoder
|
|
||||||
|
|
||||||
dec = json.NewDecoder(bytes.NewReader(body))
|
|
||||||
|
|
||||||
if err := dec.Decode(&bp); err != nil {
|
|
||||||
if err.Error() == "EOF" {
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if bp.Database == "" {
|
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if di, err := h.MetaClient.Database(bp.Database); err != nil {
|
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
} else if di == nil {
|
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", bp.Database)}, http.StatusNotFound)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if h.requireAuthentication && user == nil {
|
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", bp.Database)}, http.StatusUnauthorized)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, bp.Database) {
|
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, bp.Database)}, http.StatusUnauthorized)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
points, err := NormalizeBatchPoints(bp)
|
|
||||||
if err != nil {
|
|
||||||
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert the json batch struct to a points writer struct
|
|
||||||
if err := h.PointsWriter.WritePoints(bp.Database, bp.RetentionPolicy, models.ConsistencyLevelAny, points); err != nil {
|
|
||||||
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
|
|
||||||
if influxdb.IsClientError(err) {
|
|
||||||
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
|
||||||
} else {
|
|
||||||
resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.statMap.Add(statPointsWrittenOK, int64(len(points)))
|
|
||||||
|
|
||||||
w.WriteHeader(http.StatusNoContent)
|
|
||||||
}
|
|
||||||
|
|
||||||
// serveWriteLine receives incoming series data in line protocol format and writes it to the database.
|
|
||||||
func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body []byte, user *meta.UserInfo) {
|
|
||||||
// Some clients may not set the content-type header appropriately and send JSON with a non-json
|
|
||||||
// content-type. If the body looks JSON, try to handle it as as JSON instead
|
|
||||||
if len(body) > 0 {
|
|
||||||
var i int
|
|
||||||
for {
|
|
||||||
// JSON requests must start w/ an opening bracket
|
|
||||||
if body[i] == '{' {
|
|
||||||
h.serveWriteJSON(w, r, body, user)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// check that the byte is in the standard ascii code range
|
|
||||||
if body[i] > 32 || i >= len(body)-1 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
precision := r.FormValue("precision")
|
|
||||||
if precision == "" {
|
|
||||||
precision = "n"
|
|
||||||
}
|
|
||||||
|
|
||||||
points, parseError := models.ParsePointsWithPrecision(body, time.Now().UTC(), precision)
|
|
||||||
// Not points parsed correctly so return the error now
|
// Not points parsed correctly so return the error now
|
||||||
if parseError != nil && len(points) == 0 {
|
if parseError != nil && len(points) == 0 {
|
||||||
if parseError.Error() == "EOF" {
|
if parseError.Error() == "EOF" {
|
||||||
|
@ -522,30 +446,6 @@ func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body []
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
database := r.FormValue("db")
|
|
||||||
if database == "" {
|
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if di, err := h.MetaClient.Database(database); err != nil {
|
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
} else if di == nil {
|
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if h.requireAuthentication && user == nil {
|
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, database) {
|
|
||||||
resultError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write points.
|
// Write points.
|
||||||
if err := h.PointsWriter.WritePoints(database, r.FormValue("rp"), models.ConsistencyLevelAny, points); influxdb.IsClientError(err) {
|
if err := h.PointsWriter.WritePoints(database, r.FormValue("rp"), models.ConsistencyLevelAny, points); influxdb.IsClientError(err) {
|
||||||
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
|
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
|
||||||
|
@ -628,22 +528,6 @@ func MarshalJSON(v interface{}, pretty bool) []byte {
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
// Point represents an InfluxDB point.
|
|
||||||
type Point struct {
|
|
||||||
Name string `json:"name"`
|
|
||||||
Time time.Time `json:"time"`
|
|
||||||
Tags map[string]string `json:"tags"`
|
|
||||||
Fields map[string]interface{} `json:"fields"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// Batch is a collection of points associated with a database, having a
|
|
||||||
// certain retention policy.
|
|
||||||
type Batch struct {
|
|
||||||
Database string `json:"database"`
|
|
||||||
RetentionPolicy string `json:"retentionPolicy"`
|
|
||||||
Points []Point `json:"points"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// serveExpvar serves registered expvar information over HTTP.
|
// serveExpvar serves registered expvar information over HTTP.
|
||||||
func serveExpvar(w http.ResponseWriter, r *http.Request) {
|
func serveExpvar(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||||
|
@ -907,49 +791,3 @@ func (r *Response) Error() error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NormalizeBatchPoints returns a slice of Points, created by populating individual
|
|
||||||
// points within the batch, which do not have times or tags, with the top-level
|
|
||||||
// values.
|
|
||||||
func NormalizeBatchPoints(bp client.BatchPoints) ([]models.Point, error) {
|
|
||||||
points := []models.Point{}
|
|
||||||
for _, p := range bp.Points {
|
|
||||||
if p.Time.IsZero() {
|
|
||||||
if bp.Time.IsZero() {
|
|
||||||
p.Time = time.Now()
|
|
||||||
} else {
|
|
||||||
p.Time = bp.Time
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if p.Precision == "" && bp.Precision != "" {
|
|
||||||
p.Precision = bp.Precision
|
|
||||||
}
|
|
||||||
p.Time = client.SetPrecision(p.Time, p.Precision)
|
|
||||||
if len(bp.Tags) > 0 {
|
|
||||||
if p.Tags == nil {
|
|
||||||
p.Tags = make(map[string]string)
|
|
||||||
}
|
|
||||||
for k := range bp.Tags {
|
|
||||||
if p.Tags[k] == "" {
|
|
||||||
p.Tags[k] = bp.Tags[k]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.Measurement == "" {
|
|
||||||
return points, fmt.Errorf("missing measurement")
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(p.Fields) == 0 {
|
|
||||||
return points, fmt.Errorf("missing fields")
|
|
||||||
}
|
|
||||||
// Need to convert from a client.Point to a influxdb.Point
|
|
||||||
pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time)
|
|
||||||
if err != nil {
|
|
||||||
return points, err
|
|
||||||
}
|
|
||||||
points = append(points, pt)
|
|
||||||
}
|
|
||||||
|
|
||||||
return points, nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -2,136 +2,21 @@ package httpd_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"reflect"
|
|
||||||
"regexp"
|
"regexp"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb"
|
"github.com/influxdata/influxdb"
|
||||||
"github.com/influxdata/influxdb/client"
|
|
||||||
"github.com/influxdata/influxdb/influxql"
|
"github.com/influxdata/influxdb/influxql"
|
||||||
"github.com/influxdata/influxdb/models"
|
"github.com/influxdata/influxdb/models"
|
||||||
"github.com/influxdata/influxdb/services/httpd"
|
"github.com/influxdata/influxdb/services/httpd"
|
||||||
"github.com/influxdata/influxdb/services/meta"
|
"github.com/influxdata/influxdb/services/meta"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestBatchWrite_UnmarshalEpoch(t *testing.T) {
|
|
||||||
now := time.Now().UTC()
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
epoch int64
|
|
||||||
precision string
|
|
||||||
expected time.Time
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "nanoseconds",
|
|
||||||
epoch: now.UnixNano(),
|
|
||||||
precision: "n",
|
|
||||||
expected: now,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "microseconds",
|
|
||||||
epoch: now.Round(time.Microsecond).UnixNano() / int64(time.Microsecond),
|
|
||||||
precision: "u",
|
|
||||||
expected: now.Round(time.Microsecond),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "milliseconds",
|
|
||||||
epoch: now.Round(time.Millisecond).UnixNano() / int64(time.Millisecond),
|
|
||||||
precision: "ms",
|
|
||||||
expected: now.Round(time.Millisecond),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "seconds",
|
|
||||||
epoch: now.Round(time.Second).UnixNano() / int64(time.Second),
|
|
||||||
precision: "s",
|
|
||||||
expected: now.Round(time.Second),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "minutes",
|
|
||||||
epoch: now.Round(time.Minute).UnixNano() / int64(time.Minute),
|
|
||||||
precision: "m",
|
|
||||||
expected: now.Round(time.Minute),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "hours",
|
|
||||||
epoch: now.Round(time.Hour).UnixNano() / int64(time.Hour),
|
|
||||||
precision: "h",
|
|
||||||
expected: now.Round(time.Hour),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "max int64",
|
|
||||||
epoch: 9223372036854775807,
|
|
||||||
precision: "n",
|
|
||||||
expected: time.Unix(0, 9223372036854775807),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "100 years from now",
|
|
||||||
epoch: now.Add(time.Hour * 24 * 365 * 100).UnixNano(),
|
|
||||||
precision: "n",
|
|
||||||
expected: now.Add(time.Hour * 24 * 365 * 100),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
t.Logf("testing %q\n", test.name)
|
|
||||||
data := []byte(fmt.Sprintf(`{"time": %d, "precision":"%s"}`, test.epoch, test.precision))
|
|
||||||
t.Logf("json: %s", string(data))
|
|
||||||
var bp client.BatchPoints
|
|
||||||
err := json.Unmarshal(data, &bp)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error. expected: %v, actual: %v", nil, err)
|
|
||||||
}
|
|
||||||
if !bp.Time.Equal(test.expected) {
|
|
||||||
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, bp.Time)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchWrite_UnmarshalRFC(t *testing.T) {
|
|
||||||
now := time.Now()
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
rfc string
|
|
||||||
now time.Time
|
|
||||||
expected time.Time
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "RFC3339Nano",
|
|
||||||
rfc: time.RFC3339Nano,
|
|
||||||
now: now,
|
|
||||||
expected: now,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "RFC3339",
|
|
||||||
rfc: time.RFC3339,
|
|
||||||
now: now.Round(time.Second),
|
|
||||||
expected: now.Round(time.Second),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
t.Logf("testing %q\n", test.name)
|
|
||||||
ts := test.now.Format(test.rfc)
|
|
||||||
data := []byte(fmt.Sprintf(`{"time": %q}`, ts))
|
|
||||||
t.Logf("json: %s", string(data))
|
|
||||||
var bp client.BatchPoints
|
|
||||||
err := json.Unmarshal(data, &bp)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
|
|
||||||
}
|
|
||||||
if !bp.Time.Equal(test.expected) {
|
|
||||||
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, bp.Time)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure the handler returns results from a query (including nil results).
|
// Ensure the handler returns results from a query (including nil results).
|
||||||
func TestHandler_Query(t *testing.T) {
|
func TestHandler_Query(t *testing.T) {
|
||||||
h := NewHandler(false)
|
h := NewHandler(false)
|
||||||
|
@ -391,71 +276,6 @@ type invalidJSON struct{}
|
||||||
|
|
||||||
func (*invalidJSON) MarshalJSON() ([]byte, error) { return nil, errors.New("marker") }
|
func (*invalidJSON) MarshalJSON() ([]byte, error) { return nil, errors.New("marker") }
|
||||||
|
|
||||||
func TestNormalizeBatchPoints(t *testing.T) {
|
|
||||||
now := time.Now()
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
bp client.BatchPoints
|
|
||||||
p []models.Point
|
|
||||||
err string
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "default",
|
|
||||||
bp: client.BatchPoints{
|
|
||||||
Points: []client.Point{
|
|
||||||
{Measurement: "cpu", Tags: map[string]string{"region": "useast"}, Time: now, Fields: map[string]interface{}{"value": 1.0}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
p: []models.Point{
|
|
||||||
models.MustNewPoint("cpu", map[string]string{"region": "useast"}, map[string]interface{}{"value": 1.0}, now),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "merge time",
|
|
||||||
bp: client.BatchPoints{
|
|
||||||
Time: now,
|
|
||||||
Points: []client.Point{
|
|
||||||
{Measurement: "cpu", Tags: map[string]string{"region": "useast"}, Fields: map[string]interface{}{"value": 1.0}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
p: []models.Point{
|
|
||||||
models.MustNewPoint("cpu", map[string]string{"region": "useast"}, map[string]interface{}{"value": 1.0}, now),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "merge tags",
|
|
||||||
bp: client.BatchPoints{
|
|
||||||
Tags: map[string]string{"day": "monday"},
|
|
||||||
Points: []client.Point{
|
|
||||||
{Measurement: "cpu", Tags: map[string]string{"region": "useast"}, Time: now, Fields: map[string]interface{}{"value": 1.0}},
|
|
||||||
{Measurement: "memory", Time: now, Fields: map[string]interface{}{"value": 2.0}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
p: []models.Point{
|
|
||||||
models.MustNewPoint("cpu", map[string]string{"day": "monday", "region": "useast"}, map[string]interface{}{"value": 1.0}, now),
|
|
||||||
models.MustNewPoint("memory", map[string]string{"day": "monday"}, map[string]interface{}{"value": 2.0}, now),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
t.Logf("running test %q", test.name)
|
|
||||||
p, e := httpd.NormalizeBatchPoints(test.bp)
|
|
||||||
if test.err == "" && e != nil {
|
|
||||||
t.Errorf("unexpected error %v", e)
|
|
||||||
} else if test.err != "" && e == nil {
|
|
||||||
t.Errorf("expected error %s, got <nil>", test.err)
|
|
||||||
} else if e != nil && test.err != e.Error() {
|
|
||||||
t.Errorf("unexpected error. expected: %s, got %v", test.err, e)
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(p, test.p) {
|
|
||||||
t.Logf("expected: %+v", test.p)
|
|
||||||
t.Logf("got: %+v", p)
|
|
||||||
t.Error("failed to normalize.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewHandler represents a test wrapper for httpd.Handler.
|
// NewHandler represents a test wrapper for httpd.Handler.
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
*httpd.Handler
|
*httpd.Handler
|
||||||
|
@ -467,7 +287,7 @@ type Handler struct {
|
||||||
func NewHandler(requireAuthentication bool) *Handler {
|
func NewHandler(requireAuthentication bool) *Handler {
|
||||||
statMap := influxdb.NewStatistics("httpd", "httpd", nil)
|
statMap := influxdb.NewStatistics("httpd", "httpd", nil)
|
||||||
h := &Handler{
|
h := &Handler{
|
||||||
Handler: httpd.NewHandler(requireAuthentication, true, false, false, statMap),
|
Handler: httpd.NewHandler(requireAuthentication, true, false, statMap),
|
||||||
}
|
}
|
||||||
h.Handler.MetaClient = &h.MetaClient
|
h.Handler.MetaClient = &h.MetaClient
|
||||||
h.Handler.QueryExecutor = &h.QueryExecutor
|
h.Handler.QueryExecutor = &h.QueryExecutor
|
||||||
|
|
|
@ -64,7 +64,6 @@ func NewService(c Config) *Service {
|
||||||
c.AuthEnabled,
|
c.AuthEnabled,
|
||||||
c.LogEnabled,
|
c.LogEnabled,
|
||||||
c.WriteTracing,
|
c.WriteTracing,
|
||||||
c.JSONWriteEnabled,
|
|
||||||
statMap,
|
statMap,
|
||||||
),
|
),
|
||||||
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
|
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
|
||||||
|
|
Loading…
Reference in New Issue