Merge pull request #1448 from influxdb/http-timestamp-variants
Support epoch time with precision, RFC3339, RFC3339nano formats for writing via httppull/1459/head
commit
59bd3c1a64
|
@ -3,11 +3,13 @@ package client
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
|
@ -31,7 +33,7 @@ type Query struct {
|
|||
type Write struct {
|
||||
Database string
|
||||
RetentionPolicy string
|
||||
Points []influxdb.Point
|
||||
Points []Point
|
||||
}
|
||||
|
||||
func NewClient(c Config) (*Client, error) {
|
||||
|
@ -44,7 +46,7 @@ func NewClient(c Config) (*Client, error) {
|
|||
return &client, nil
|
||||
}
|
||||
|
||||
func (c *Client) Query(q Query) (*influxdb.Results, error) {
|
||||
func (c *Client) Query(q Query) (*Results, error) {
|
||||
u := c.url
|
||||
|
||||
u.Path = "query"
|
||||
|
@ -59,7 +61,7 @@ func (c *Client) Query(q Query) (*influxdb.Results, error) {
|
|||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var results influxdb.Results
|
||||
var results Results
|
||||
err = json.NewDecoder(resp.Body).Decode(&results)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -67,12 +69,12 @@ func (c *Client) Query(q Query) (*influxdb.Results, error) {
|
|||
return &results, nil
|
||||
}
|
||||
|
||||
func (c *Client) Write(writes ...Write) (*influxdb.Results, error) {
|
||||
func (c *Client) Write(writes ...Write) (*Results, error) {
|
||||
c.url.Path = "write"
|
||||
type data struct {
|
||||
Points []influxdb.Point `json:"points"`
|
||||
Database string `json:"database"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
Points []Point `json:"points"`
|
||||
Database string `json:"database"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
}
|
||||
|
||||
d := []data{}
|
||||
|
@ -89,7 +91,7 @@ func (c *Client) Write(writes ...Write) (*influxdb.Results, error) {
|
|||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var results influxdb.Results
|
||||
var results Results
|
||||
err = json.NewDecoder(resp.Body).Decode(&results)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -109,27 +111,225 @@ func (c *Client) Ping() (time.Duration, string, error) {
|
|||
return time.Since(now), version, nil
|
||||
}
|
||||
|
||||
// Structs
|
||||
|
||||
// Result represents a resultset returned from a single statement.
|
||||
type Result struct {
|
||||
Rows []*influxql.Row
|
||||
Err error
|
||||
}
|
||||
|
||||
// MarshalJSON encodes the result into JSON.
|
||||
func (r *Result) MarshalJSON() ([]byte, error) {
|
||||
// Define a struct that outputs "error" as a string.
|
||||
var o struct {
|
||||
Rows []*influxql.Row `json:"rows,omitempty"`
|
||||
Err string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// Copy fields to output struct.
|
||||
o.Rows = r.Rows
|
||||
if r.Err != nil {
|
||||
o.Err = r.Err.Error()
|
||||
}
|
||||
|
||||
return json.Marshal(&o)
|
||||
}
|
||||
|
||||
// UnmarshalJSON decodes the data into the Result struct
|
||||
func (r *Result) UnmarshalJSON(b []byte) error {
|
||||
var o struct {
|
||||
Rows []*influxql.Row `json:"rows,omitempty"`
|
||||
Err string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
err := json.Unmarshal(b, &o)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Rows = o.Rows
|
||||
if o.Err != "" {
|
||||
r.Err = errors.New(o.Err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Results represents a list of statement results.
|
||||
type Results struct {
|
||||
Results []*Result
|
||||
Err error
|
||||
}
|
||||
|
||||
func (r Results) MarshalJSON() ([]byte, error) {
|
||||
// Define a struct that outputs "error" as a string.
|
||||
var o struct {
|
||||
Results []*Result `json:"results,omitempty"`
|
||||
Err string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// Copy fields to output struct.
|
||||
o.Results = r.Results
|
||||
if r.Err != nil {
|
||||
o.Err = r.Err.Error()
|
||||
}
|
||||
|
||||
return json.Marshal(&o)
|
||||
}
|
||||
|
||||
// UnmarshalJSON decodes the data into the Results struct
|
||||
func (r *Results) UnmarshalJSON(b []byte) error {
|
||||
var o struct {
|
||||
Results []*Result `json:"results,omitempty"`
|
||||
Err string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
err := json.Unmarshal(b, &o)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Results = o.Results
|
||||
if o.Err != "" {
|
||||
r.Err = errors.New(o.Err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Error returns the first error from any statement.
|
||||
// Returns nil if no errors occurred on any statements.
|
||||
func (a Results) Error() error {
|
||||
for _, r := range a.Results {
|
||||
if r.Err != nil {
|
||||
return r.Err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Timestamp is a custom type so we marshal JSON properly into UTC nanosecond time
|
||||
type Timestamp time.Time
|
||||
|
||||
// Time returns the time represented by the Timestamp
|
||||
func (t Timestamp) Time() time.Time {
|
||||
return time.Time(t)
|
||||
}
|
||||
|
||||
// MarshalJSON returns time in UTC with nanoseconds
|
||||
func (t Timestamp) MarshalJSON() ([]byte, error) {
|
||||
// Always send back in UTC with nanoseconds
|
||||
s := t.Time().UTC().Format(time.RFC3339Nano)
|
||||
return []byte(`"` + s + `"`), nil
|
||||
}
|
||||
|
||||
// Point defines the values that will be written to the database
|
||||
type Point struct {
|
||||
Name string `json:"name"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Timestamp Timestamp `json:"timestamp"`
|
||||
Values map[string]interface{} `json:"values"`
|
||||
Precision string `json:"precision"`
|
||||
}
|
||||
|
||||
// UnmarshalJSON decodes the data into the Point struct
|
||||
func (p *Point) UnmarshalJSON(b []byte) error {
|
||||
var normal struct {
|
||||
Name string `json:"name"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Precision string `json:"precision"`
|
||||
Values map[string]interface{} `json:"values"`
|
||||
}
|
||||
var epoch struct {
|
||||
Name string `json:"name"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Precision string `json:"precision"`
|
||||
Values map[string]interface{} `json:"values"`
|
||||
}
|
||||
|
||||
if err := func() error {
|
||||
var err error
|
||||
if err = json.Unmarshal(b, &epoch); err != nil {
|
||||
return err
|
||||
}
|
||||
// Convert from epoch to time.Time
|
||||
ts, err := EpochToTime(epoch.Timestamp, epoch.Precision)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.Name = epoch.Name
|
||||
p.Tags = epoch.Tags
|
||||
p.Timestamp = Timestamp(ts)
|
||||
p.Precision = epoch.Precision
|
||||
p.Values = epoch.Values
|
||||
return nil
|
||||
}(); err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(b, &normal); err != nil {
|
||||
return err
|
||||
}
|
||||
normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision)
|
||||
p.Name = normal.Name
|
||||
p.Tags = normal.Tags
|
||||
p.Timestamp = Timestamp(normal.Timestamp)
|
||||
p.Precision = normal.Precision
|
||||
p.Values = normal.Values
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// utility functions
|
||||
|
||||
func (c *Client) Addr() string {
|
||||
return c.url.String()
|
||||
}
|
||||
|
||||
//func (c *Client) urlFor(path string) (*url.URL, error) {
|
||||
//var u *url.URL
|
||||
//u, err := url.Parse(fmt.Sprintf("%s%s", c.addr, path))
|
||||
//if err != nil {
|
||||
//return nil, err
|
||||
//}
|
||||
//if c.username != "" {
|
||||
//u.User = url.UserPassword(c.username, c.password)
|
||||
//}
|
||||
//u.Scheme = "http"
|
||||
//return u, nil
|
||||
//}
|
||||
|
||||
// helper functions
|
||||
|
||||
// EpochToTime takes a unix epoch time and uses precision to return back a time.Time
|
||||
func EpochToTime(epoch int64, precision string) (time.Time, error) {
|
||||
if precision == "" {
|
||||
precision = "s"
|
||||
}
|
||||
var t time.Time
|
||||
switch precision {
|
||||
case "h":
|
||||
t = time.Unix(0, epoch*int64(time.Hour))
|
||||
case "m":
|
||||
t = time.Unix(0, epoch*int64(time.Minute))
|
||||
case "s":
|
||||
t = time.Unix(0, epoch*int64(time.Second))
|
||||
case "ms":
|
||||
t = time.Unix(0, epoch*int64(time.Millisecond))
|
||||
case "u":
|
||||
t = time.Unix(0, epoch*int64(time.Microsecond))
|
||||
case "n":
|
||||
t = time.Unix(0, epoch)
|
||||
default:
|
||||
return time.Time{}, fmt.Errorf("Unknowm precision %q", precision)
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// SetPrecision will round a time to the specified precision
|
||||
func SetPrecision(t time.Time, precision string) time.Time {
|
||||
switch precision {
|
||||
case "n":
|
||||
case "u":
|
||||
return t.Round(time.Microsecond)
|
||||
case "ms":
|
||||
return t.Round(time.Millisecond)
|
||||
case "s":
|
||||
return t.Round(time.Second)
|
||||
case "m":
|
||||
return t.Round(time.Minute)
|
||||
case "h":
|
||||
return t.Round(time.Hour)
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
func detect(values ...string) string {
|
||||
for _, v := range values {
|
||||
if v != "" {
|
||||
|
|
|
@ -2,10 +2,12 @@ package client_test
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/client"
|
||||
|
@ -116,6 +118,146 @@ func TestClient_Write(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestPoint_UnmarshalEpoch(t *testing.T) {
|
||||
now := time.Now()
|
||||
tests := []struct {
|
||||
name string
|
||||
epoch int64
|
||||
precision string
|
||||
expected time.Time
|
||||
}{
|
||||
{
|
||||
name: "nanoseconds",
|
||||
epoch: now.UnixNano(),
|
||||
precision: "n",
|
||||
expected: now,
|
||||
},
|
||||
{
|
||||
name: "microseconds",
|
||||
epoch: now.Round(time.Microsecond).UnixNano() / int64(time.Microsecond),
|
||||
precision: "u",
|
||||
expected: now.Round(time.Microsecond),
|
||||
},
|
||||
{
|
||||
name: "milliseconds",
|
||||
epoch: now.Round(time.Millisecond).UnixNano() / int64(time.Millisecond),
|
||||
precision: "ms",
|
||||
expected: now.Round(time.Millisecond),
|
||||
},
|
||||
{
|
||||
name: "seconds",
|
||||
epoch: now.Round(time.Second).UnixNano() / int64(time.Second),
|
||||
precision: "s",
|
||||
expected: now.Round(time.Second),
|
||||
},
|
||||
{
|
||||
name: "minutes",
|
||||
epoch: now.Round(time.Minute).UnixNano() / int64(time.Minute),
|
||||
precision: "m",
|
||||
expected: now.Round(time.Minute),
|
||||
},
|
||||
{
|
||||
name: "hours",
|
||||
epoch: now.Round(time.Hour).UnixNano() / int64(time.Hour),
|
||||
precision: "h",
|
||||
expected: now.Round(time.Hour),
|
||||
},
|
||||
{
|
||||
name: "max int64",
|
||||
epoch: 9223372036854775807,
|
||||
precision: "n",
|
||||
expected: time.Unix(0, 9223372036854775807),
|
||||
},
|
||||
{
|
||||
name: "100 years from now",
|
||||
epoch: now.Add(time.Hour * 24 * 365 * 100).UnixNano(),
|
||||
precision: "n",
|
||||
expected: now.Add(time.Hour * 24 * 365 * 100),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Logf("testing %q\n", test.name)
|
||||
data := []byte(fmt.Sprintf(`{"timestamp": %d, "precision":"%s"}`, test.epoch, test.precision))
|
||||
t.Logf("json: %s", string(data))
|
||||
var p client.Point
|
||||
err := json.Unmarshal(data, &p)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
|
||||
}
|
||||
if p.Timestamp.Time() != test.expected {
|
||||
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPoint_UnmarshalRFC(t *testing.T) {
|
||||
now := time.Now()
|
||||
tests := []struct {
|
||||
name string
|
||||
rfc string
|
||||
now time.Time
|
||||
expected time.Time
|
||||
}{
|
||||
{
|
||||
name: "RFC3339Nano",
|
||||
rfc: time.RFC3339Nano,
|
||||
now: now,
|
||||
expected: now,
|
||||
},
|
||||
{
|
||||
name: "RFC3339",
|
||||
rfc: time.RFC3339,
|
||||
now: now.Round(time.Second),
|
||||
expected: now.Round(time.Second),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Logf("testing %q\n", test.name)
|
||||
ts := test.now.Format(test.rfc)
|
||||
data := []byte(fmt.Sprintf(`{"timestamp": %q}`, ts))
|
||||
t.Logf("json: %s", string(data))
|
||||
var p client.Point
|
||||
err := json.Unmarshal(data, &p)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
|
||||
}
|
||||
if p.Timestamp.Time() != test.expected {
|
||||
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEpochToTime(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
epoch int64
|
||||
precision string
|
||||
expected time.Time
|
||||
}{
|
||||
{name: "nanoseconds", epoch: now.UnixNano(), precision: "n", expected: now},
|
||||
{name: "microseconds", epoch: now.Round(time.Microsecond).UnixNano() / int64(time.Microsecond), precision: "u", expected: now.Round(time.Microsecond)},
|
||||
{name: "milliseconds", epoch: now.Round(time.Millisecond).UnixNano() / int64(time.Millisecond), precision: "ms", expected: now.Round(time.Millisecond)},
|
||||
{name: "seconds", epoch: now.Round(time.Second).UnixNano() / int64(time.Second), precision: "s", expected: now.Round(time.Second)},
|
||||
{name: "minutes", epoch: now.Round(time.Minute).UnixNano() / int64(time.Minute), precision: "m", expected: now.Round(time.Minute)},
|
||||
{name: "hours", epoch: now.Round(time.Hour).UnixNano() / int64(time.Hour), precision: "h", expected: now.Round(time.Hour)},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Logf("testing %q\n", test.name)
|
||||
tm, e := client.EpochToTime(test.epoch, test.precision)
|
||||
if e != nil {
|
||||
t.Fatalf("unexpected error: expected %v, actual: %v", nil, e)
|
||||
}
|
||||
if tm != test.expected {
|
||||
t.Fatalf("unexpected time: expected %v, actual %v", test.expected, tm)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// helper functions
|
||||
|
||||
func emptyTestServer() *httptest.Server {
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
|
||||
"github.com/bmizerany/pat"
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/client"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
)
|
||||
|
||||
|
@ -140,17 +141,73 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ
|
|||
httpResults(w, results, pretty)
|
||||
}
|
||||
|
||||
type batchWrite struct {
|
||||
Points []influxdb.Point `json:"points"`
|
||||
// BatchWrite is used to send batch write data to the http /write endpoint
|
||||
type BatchWrite struct {
|
||||
Points []client.Point `json:"points"`
|
||||
Database string `json:"database"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Precision string `json:"precision"`
|
||||
}
|
||||
|
||||
// UnmarshalJSON decodes the data into the batchWrite struct
|
||||
func (br *BatchWrite) UnmarshalJSON(b []byte) error {
|
||||
var normal struct {
|
||||
Points []client.Point `json:"points"`
|
||||
Database string `json:"database"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Precision string `json:"precision"`
|
||||
}
|
||||
var epoch struct {
|
||||
Points []client.Point `json:"points"`
|
||||
Database string `json:"database"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Precision string `json:"precision"`
|
||||
}
|
||||
|
||||
if err := func() error {
|
||||
var err error
|
||||
if err = json.Unmarshal(b, &epoch); err != nil {
|
||||
return err
|
||||
}
|
||||
// Convert from epoch to time.Time
|
||||
ts, err := client.EpochToTime(epoch.Timestamp, epoch.Precision)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
br.Points = epoch.Points
|
||||
br.Database = epoch.Database
|
||||
br.RetentionPolicy = epoch.RetentionPolicy
|
||||
br.Tags = epoch.Tags
|
||||
br.Timestamp = ts
|
||||
br.Precision = epoch.Precision
|
||||
return nil
|
||||
}(); err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(b, &normal); err != nil {
|
||||
return err
|
||||
}
|
||||
normal.Timestamp = client.SetPrecision(normal.Timestamp, normal.Precision)
|
||||
br.Points = normal.Points
|
||||
br.Database = normal.Database
|
||||
br.RetentionPolicy = normal.RetentionPolicy
|
||||
br.Tags = normal.Tags
|
||||
br.Timestamp = normal.Timestamp
|
||||
br.Precision = normal.Precision
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// serveWrite receives incoming series data and writes it to the database.
|
||||
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
|
||||
var br batchWrite
|
||||
var br BatchWrite
|
||||
|
||||
dec := json.NewDecoder(r.Body)
|
||||
|
||||
|
@ -187,9 +244,13 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influ
|
|||
}
|
||||
|
||||
for _, p := range br.Points {
|
||||
if p.Timestamp.IsZero() {
|
||||
p.Timestamp = br.Timestamp
|
||||
if p.Timestamp.Time().IsZero() {
|
||||
p.Timestamp = client.Timestamp(br.Timestamp)
|
||||
}
|
||||
if p.Precision == "" && br.Precision != "" {
|
||||
p.Precision = br.Precision
|
||||
}
|
||||
p.Timestamp = client.Timestamp(client.SetPrecision(p.Timestamp.Time(), p.Precision))
|
||||
if len(br.Tags) > 0 {
|
||||
for k, _ := range br.Tags {
|
||||
if p.Tags[k] == "" {
|
||||
|
@ -197,7 +258,14 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influ
|
|||
}
|
||||
}
|
||||
}
|
||||
if _, err := h.server.WriteSeries(br.Database, br.RetentionPolicy, []influxdb.Point{p}); err != nil {
|
||||
// Need to convert from a client.Point to a influxdb.Point
|
||||
iPoint := influxdb.Point{
|
||||
Name: p.Name,
|
||||
Tags: p.Tags,
|
||||
Timestamp: p.Timestamp.Time(),
|
||||
Values: p.Values,
|
||||
}
|
||||
if _, err := h.server.WriteSeries(br.Database, br.RetentionPolicy, []influxdb.Point{iPoint}); err != nil {
|
||||
writeError(influxdb.Result{Err: err}, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
|
@ -12,6 +13,7 @@ import (
|
|||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/httpd"
|
||||
|
@ -23,6 +25,117 @@ func init() {
|
|||
influxdb.BcryptCost = 4
|
||||
}
|
||||
|
||||
func TestBatchWrite_UnmarshalEpoch(t *testing.T) {
|
||||
now := time.Now()
|
||||
tests := []struct {
|
||||
name string
|
||||
epoch int64
|
||||
precision string
|
||||
expected time.Time
|
||||
}{
|
||||
{
|
||||
name: "nanoseconds",
|
||||
epoch: now.UnixNano(),
|
||||
precision: "n",
|
||||
expected: now,
|
||||
},
|
||||
{
|
||||
name: "microseconds",
|
||||
epoch: now.Round(time.Microsecond).UnixNano() / int64(time.Microsecond),
|
||||
precision: "u",
|
||||
expected: now.Round(time.Microsecond),
|
||||
},
|
||||
{
|
||||
name: "milliseconds",
|
||||
epoch: now.Round(time.Millisecond).UnixNano() / int64(time.Millisecond),
|
||||
precision: "ms",
|
||||
expected: now.Round(time.Millisecond),
|
||||
},
|
||||
{
|
||||
name: "seconds",
|
||||
epoch: now.Round(time.Second).UnixNano() / int64(time.Second),
|
||||
precision: "s",
|
||||
expected: now.Round(time.Second),
|
||||
},
|
||||
{
|
||||
name: "minutes",
|
||||
epoch: now.Round(time.Minute).UnixNano() / int64(time.Minute),
|
||||
precision: "m",
|
||||
expected: now.Round(time.Minute),
|
||||
},
|
||||
{
|
||||
name: "hours",
|
||||
epoch: now.Round(time.Hour).UnixNano() / int64(time.Hour),
|
||||
precision: "h",
|
||||
expected: now.Round(time.Hour),
|
||||
},
|
||||
{
|
||||
name: "max int64",
|
||||
epoch: 9223372036854775807,
|
||||
precision: "n",
|
||||
expected: time.Unix(0, 9223372036854775807),
|
||||
},
|
||||
{
|
||||
name: "100 years from now",
|
||||
epoch: now.Add(time.Hour * 24 * 365 * 100).UnixNano(),
|
||||
precision: "n",
|
||||
expected: now.Add(time.Hour * 24 * 365 * 100),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Logf("testing %q\n", test.name)
|
||||
data := []byte(fmt.Sprintf(`{"timestamp": %d, "precision":"%s"}`, test.epoch, test.precision))
|
||||
t.Logf("json: %s", string(data))
|
||||
var br httpd.BatchWrite
|
||||
err := json.Unmarshal(data, &br)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
|
||||
}
|
||||
if br.Timestamp != test.expected {
|
||||
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, br.Timestamp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchWrite_UnmarshalRFC(t *testing.T) {
|
||||
now := time.Now()
|
||||
tests := []struct {
|
||||
name string
|
||||
rfc string
|
||||
now time.Time
|
||||
expected time.Time
|
||||
}{
|
||||
{
|
||||
name: "RFC3339Nano",
|
||||
rfc: time.RFC3339Nano,
|
||||
now: now,
|
||||
expected: now,
|
||||
},
|
||||
{
|
||||
name: "RFC3339",
|
||||
rfc: time.RFC3339,
|
||||
now: now.Round(time.Second),
|
||||
expected: now.Round(time.Second),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Logf("testing %q\n", test.name)
|
||||
ts := test.now.Format(test.rfc)
|
||||
data := []byte(fmt.Sprintf(`{"timestamp": %q}`, ts))
|
||||
t.Logf("json: %s", string(data))
|
||||
var br httpd.BatchWrite
|
||||
err := json.Unmarshal(data, &br)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
|
||||
}
|
||||
if br.Timestamp != test.expected {
|
||||
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, br.Timestamp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_Databases(t *testing.T) {
|
||||
srvr := OpenServer(NewMessagingClient())
|
||||
srvr.CreateDatabase("foo")
|
||||
|
@ -1273,6 +1386,8 @@ func TestHandler_serveShowTagValues(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// batchWrite JSON Unmarshal tests
|
||||
|
||||
// Utility functions for this test suite.
|
||||
|
||||
func MustHTTP(verb, path string, params, headers map[string]string, body string) (int, string) {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/url"
|
||||
"os"
|
||||
"reflect"
|
||||
|
@ -899,6 +900,39 @@ func TestServer_TagNamesBySeries(t *testing.T) { t.Skip("pending") }
|
|||
func TestServer_TagValues(t *testing.T) { t.Skip("pending") }
|
||||
func TestServer_TagValuesBySeries(t *testing.T) { t.Skip("pending") }
|
||||
|
||||
// Point JSON Unmarshal tests
|
||||
|
||||
func TestbatchWrite_UnmarshalEpoch(t *testing.T) {
|
||||
var (
|
||||
now = time.Now()
|
||||
nanos = now.UnixNano()
|
||||
micros = nanos / int64(time.Microsecond)
|
||||
millis = nanos / int64(time.Millisecond)
|
||||
seconds = nanos / int64(time.Second)
|
||||
minutes = nanos / int64(time.Minute)
|
||||
hours = nanos / int64(time.Hour)
|
||||
)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
epoch int64
|
||||
}{
|
||||
{name: "nanos", epoch: nanos},
|
||||
{name: "micros", epoch: micros},
|
||||
{name: "millis", epoch: millis},
|
||||
{name: "seconds", epoch: seconds},
|
||||
{name: "minutes", epoch: minutes},
|
||||
{name: "hours", epoch: hours},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
json := fmt.Sprintf(`"points": [{timestamp: "%d"}`, test.epoch)
|
||||
log.Println(json)
|
||||
t.Fatal("foo")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Server is a wrapping test struct for influxdb.Server.
|
||||
type Server struct {
|
||||
*influxdb.Server
|
||||
|
|
Loading…
Reference in New Issue