diff --git a/client/influxdb.go b/client/influxdb.go index 2a0484ab25..e46137a2fd 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -3,11 +3,13 @@ package client import ( "bytes" "encoding/json" + "errors" + "fmt" "net/http" "net/url" "time" - "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/influxql" ) type Config struct { @@ -31,7 +33,7 @@ type Query struct { type Write struct { Database string RetentionPolicy string - Points []influxdb.Point + Points []Point } func NewClient(c Config) (*Client, error) { @@ -44,7 +46,7 @@ func NewClient(c Config) (*Client, error) { return &client, nil } -func (c *Client) Query(q Query) (*influxdb.Results, error) { +func (c *Client) Query(q Query) (*Results, error) { u := c.url u.Path = "query" @@ -59,7 +61,7 @@ func (c *Client) Query(q Query) (*influxdb.Results, error) { } defer resp.Body.Close() - var results influxdb.Results + var results Results err = json.NewDecoder(resp.Body).Decode(&results) if err != nil { return nil, err @@ -67,12 +69,12 @@ func (c *Client) Query(q Query) (*influxdb.Results, error) { return &results, nil } -func (c *Client) Write(writes ...Write) (*influxdb.Results, error) { +func (c *Client) Write(writes ...Write) (*Results, error) { c.url.Path = "write" type data struct { - Points []influxdb.Point `json:"points"` - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` + Points []Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` } d := []data{} @@ -89,7 +91,7 @@ func (c *Client) Write(writes ...Write) (*influxdb.Results, error) { } defer resp.Body.Close() - var results influxdb.Results + var results Results err = json.NewDecoder(resp.Body).Decode(&results) if err != nil { return nil, err @@ -109,27 +111,225 @@ func (c *Client) Ping() (time.Duration, string, error) { return time.Since(now), version, nil } +// Structs + +// Result represents a resultset returned from a single statement. +type Result struct { + Rows []*influxql.Row + Err error +} + +// MarshalJSON encodes the result into JSON. +func (r *Result) MarshalJSON() ([]byte, error) { + // Define a struct that outputs "error" as a string. + var o struct { + Rows []*influxql.Row `json:"rows,omitempty"` + Err string `json:"error,omitempty"` + } + + // Copy fields to output struct. + o.Rows = r.Rows + if r.Err != nil { + o.Err = r.Err.Error() + } + + return json.Marshal(&o) +} + +// UnmarshalJSON decodes the data into the Result struct +func (r *Result) UnmarshalJSON(b []byte) error { + var o struct { + Rows []*influxql.Row `json:"rows,omitempty"` + Err string `json:"error,omitempty"` + } + + err := json.Unmarshal(b, &o) + if err != nil { + return err + } + r.Rows = o.Rows + if o.Err != "" { + r.Err = errors.New(o.Err) + } + return nil +} + +// Results represents a list of statement results. +type Results struct { + Results []*Result + Err error +} + +func (r Results) MarshalJSON() ([]byte, error) { + // Define a struct that outputs "error" as a string. + var o struct { + Results []*Result `json:"results,omitempty"` + Err string `json:"error,omitempty"` + } + + // Copy fields to output struct. + o.Results = r.Results + if r.Err != nil { + o.Err = r.Err.Error() + } + + return json.Marshal(&o) +} + +// UnmarshalJSON decodes the data into the Results struct +func (r *Results) UnmarshalJSON(b []byte) error { + var o struct { + Results []*Result `json:"results,omitempty"` + Err string `json:"error,omitempty"` + } + + err := json.Unmarshal(b, &o) + if err != nil { + return err + } + r.Results = o.Results + if o.Err != "" { + r.Err = errors.New(o.Err) + } + return nil +} + +// Error returns the first error from any statement. +// Returns nil if no errors occurred on any statements. +func (a Results) Error() error { + for _, r := range a.Results { + if r.Err != nil { + return r.Err + } + } + return nil +} + +// Timestamp is a custom type so we marshal JSON properly into UTC nanosecond time +type Timestamp time.Time + +// Time returns the time represented by the Timestamp +func (t Timestamp) Time() time.Time { + return time.Time(t) +} + +// MarshalJSON returns time in UTC with nanoseconds +func (t Timestamp) MarshalJSON() ([]byte, error) { + // Always send back in UTC with nanoseconds + s := t.Time().UTC().Format(time.RFC3339Nano) + return []byte(`"` + s + `"`), nil +} + +// Point defines the values that will be written to the database +type Point struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Timestamp Timestamp `json:"timestamp"` + Values map[string]interface{} `json:"values"` + Precision string `json:"precision"` +} + +// UnmarshalJSON decodes the data into the Point struct +func (p *Point) UnmarshalJSON(b []byte) error { + var normal struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Timestamp time.Time `json:"timestamp"` + Precision string `json:"precision"` + Values map[string]interface{} `json:"values"` + } + var epoch struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Timestamp int64 `json:"timestamp"` + Precision string `json:"precision"` + Values map[string]interface{} `json:"values"` + } + + if err := func() error { + var err error + if err = json.Unmarshal(b, &epoch); err != nil { + return err + } + // Convert from epoch to time.Time + ts, err := EpochToTime(epoch.Timestamp, epoch.Precision) + if err != nil { + return err + } + p.Name = epoch.Name + p.Tags = epoch.Tags + p.Timestamp = Timestamp(ts) + p.Precision = epoch.Precision + p.Values = epoch.Values + return nil + }(); err == nil { + return nil + } + + if err := json.Unmarshal(b, &normal); err != nil { + return err + } + normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision) + p.Name = normal.Name + p.Tags = normal.Tags + p.Timestamp = Timestamp(normal.Timestamp) + p.Precision = normal.Precision + p.Values = normal.Values + + return nil +} + // utility functions func (c *Client) Addr() string { return c.url.String() } -//func (c *Client) urlFor(path string) (*url.URL, error) { -//var u *url.URL -//u, err := url.Parse(fmt.Sprintf("%s%s", c.addr, path)) -//if err != nil { -//return nil, err -//} -//if c.username != "" { -//u.User = url.UserPassword(c.username, c.password) -//} -//u.Scheme = "http" -//return u, nil -//} - // helper functions +// EpochToTime takes a unix epoch time and uses precision to return back a time.Time +func EpochToTime(epoch int64, precision string) (time.Time, error) { + if precision == "" { + precision = "s" + } + var t time.Time + switch precision { + case "h": + t = time.Unix(0, epoch*int64(time.Hour)) + case "m": + t = time.Unix(0, epoch*int64(time.Minute)) + case "s": + t = time.Unix(0, epoch*int64(time.Second)) + case "ms": + t = time.Unix(0, epoch*int64(time.Millisecond)) + case "u": + t = time.Unix(0, epoch*int64(time.Microsecond)) + case "n": + t = time.Unix(0, epoch) + default: + return time.Time{}, fmt.Errorf("Unknowm precision %q", precision) + } + return t, nil +} + +// SetPrecision will round a time to the specified precision +func SetPrecision(t time.Time, precision string) time.Time { + switch precision { + case "n": + case "u": + return t.Round(time.Microsecond) + case "ms": + return t.Round(time.Millisecond) + case "s": + return t.Round(time.Second) + case "m": + return t.Round(time.Minute) + case "h": + return t.Round(time.Hour) + } + return t +} + func detect(values ...string) string { for _, v := range values { if v != "" { diff --git a/client/influxdb_test.go b/client/influxdb_test.go index 6131c8796b..70c76438a5 100644 --- a/client/influxdb_test.go +++ b/client/influxdb_test.go @@ -2,10 +2,12 @@ package client_test import ( "encoding/json" + "fmt" "net/http" "net/http/httptest" "net/url" "testing" + "time" "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/client" @@ -116,6 +118,146 @@ func TestClient_Write(t *testing.T) { } } +func TestPoint_UnmarshalEpoch(t *testing.T) { + now := time.Now() + tests := []struct { + name string + epoch int64 + precision string + expected time.Time + }{ + { + name: "nanoseconds", + epoch: now.UnixNano(), + precision: "n", + expected: now, + }, + { + name: "microseconds", + epoch: now.Round(time.Microsecond).UnixNano() / int64(time.Microsecond), + precision: "u", + expected: now.Round(time.Microsecond), + }, + { + name: "milliseconds", + epoch: now.Round(time.Millisecond).UnixNano() / int64(time.Millisecond), + precision: "ms", + expected: now.Round(time.Millisecond), + }, + { + name: "seconds", + epoch: now.Round(time.Second).UnixNano() / int64(time.Second), + precision: "s", + expected: now.Round(time.Second), + }, + { + name: "minutes", + epoch: now.Round(time.Minute).UnixNano() / int64(time.Minute), + precision: "m", + expected: now.Round(time.Minute), + }, + { + name: "hours", + epoch: now.Round(time.Hour).UnixNano() / int64(time.Hour), + precision: "h", + expected: now.Round(time.Hour), + }, + { + name: "max int64", + epoch: 9223372036854775807, + precision: "n", + expected: time.Unix(0, 9223372036854775807), + }, + { + name: "100 years from now", + epoch: now.Add(time.Hour * 24 * 365 * 100).UnixNano(), + precision: "n", + expected: now.Add(time.Hour * 24 * 365 * 100), + }, + } + + for _, test := range tests { + t.Logf("testing %q\n", test.name) + data := []byte(fmt.Sprintf(`{"timestamp": %d, "precision":"%s"}`, test.epoch, test.precision)) + t.Logf("json: %s", string(data)) + var p client.Point + err := json.Unmarshal(data, &p) + if err != nil { + t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) + } + if p.Timestamp.Time() != test.expected { + t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp) + } + } +} + +func TestPoint_UnmarshalRFC(t *testing.T) { + now := time.Now() + tests := []struct { + name string + rfc string + now time.Time + expected time.Time + }{ + { + name: "RFC3339Nano", + rfc: time.RFC3339Nano, + now: now, + expected: now, + }, + { + name: "RFC3339", + rfc: time.RFC3339, + now: now.Round(time.Second), + expected: now.Round(time.Second), + }, + } + + for _, test := range tests { + t.Logf("testing %q\n", test.name) + ts := test.now.Format(test.rfc) + data := []byte(fmt.Sprintf(`{"timestamp": %q}`, ts)) + t.Logf("json: %s", string(data)) + var p client.Point + err := json.Unmarshal(data, &p) + if err != nil { + t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) + } + if p.Timestamp.Time() != test.expected { + t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp) + } + } +} + +func TestEpochToTime(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + epoch int64 + precision string + expected time.Time + }{ + {name: "nanoseconds", epoch: now.UnixNano(), precision: "n", expected: now}, + {name: "microseconds", epoch: now.Round(time.Microsecond).UnixNano() / int64(time.Microsecond), precision: "u", expected: now.Round(time.Microsecond)}, + {name: "milliseconds", epoch: now.Round(time.Millisecond).UnixNano() / int64(time.Millisecond), precision: "ms", expected: now.Round(time.Millisecond)}, + {name: "seconds", epoch: now.Round(time.Second).UnixNano() / int64(time.Second), precision: "s", expected: now.Round(time.Second)}, + {name: "minutes", epoch: now.Round(time.Minute).UnixNano() / int64(time.Minute), precision: "m", expected: now.Round(time.Minute)}, + {name: "hours", epoch: now.Round(time.Hour).UnixNano() / int64(time.Hour), precision: "h", expected: now.Round(time.Hour)}, + } + + for _, test := range tests { + t.Logf("testing %q\n", test.name) + tm, e := client.EpochToTime(test.epoch, test.precision) + if e != nil { + t.Fatalf("unexpected error: expected %v, actual: %v", nil, e) + } + if tm != test.expected { + t.Fatalf("unexpected time: expected %v, actual %v", test.expected, tm) + } + } +} + // helper functions func emptyTestServer() *httptest.Server { diff --git a/httpd/handler.go b/httpd/handler.go index 86f00cf173..87895f1ac3 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -16,6 +16,7 @@ import ( "github.com/bmizerany/pat" "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/client" "github.com/influxdb/influxdb/influxql" ) @@ -140,17 +141,73 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ httpResults(w, results, pretty) } -type batchWrite struct { - Points []influxdb.Point `json:"points"` +// BatchWrite is used to send batch write data to the http /write endpoint +type BatchWrite struct { + Points []client.Point `json:"points"` Database string `json:"database"` RetentionPolicy string `json:"retentionPolicy"` Tags map[string]string `json:"tags"` Timestamp time.Time `json:"timestamp"` + Precision string `json:"precision"` +} + +// UnmarshalJSON decodes the data into the batchWrite struct +func (br *BatchWrite) UnmarshalJSON(b []byte) error { + var normal struct { + Points []client.Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp time.Time `json:"timestamp"` + Precision string `json:"precision"` + } + var epoch struct { + Points []client.Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp int64 `json:"timestamp"` + Precision string `json:"precision"` + } + + if err := func() error { + var err error + if err = json.Unmarshal(b, &epoch); err != nil { + return err + } + // Convert from epoch to time.Time + ts, err := client.EpochToTime(epoch.Timestamp, epoch.Precision) + if err != nil { + return err + } + br.Points = epoch.Points + br.Database = epoch.Database + br.RetentionPolicy = epoch.RetentionPolicy + br.Tags = epoch.Tags + br.Timestamp = ts + br.Precision = epoch.Precision + return nil + }(); err == nil { + return nil + } + + if err := json.Unmarshal(b, &normal); err != nil { + return err + } + normal.Timestamp = client.SetPrecision(normal.Timestamp, normal.Precision) + br.Points = normal.Points + br.Database = normal.Database + br.RetentionPolicy = normal.RetentionPolicy + br.Tags = normal.Tags + br.Timestamp = normal.Timestamp + br.Precision = normal.Precision + + return nil } // serveWrite receives incoming series data and writes it to the database. func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) { - var br batchWrite + var br BatchWrite dec := json.NewDecoder(r.Body) @@ -187,9 +244,13 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influ } for _, p := range br.Points { - if p.Timestamp.IsZero() { - p.Timestamp = br.Timestamp + if p.Timestamp.Time().IsZero() { + p.Timestamp = client.Timestamp(br.Timestamp) } + if p.Precision == "" && br.Precision != "" { + p.Precision = br.Precision + } + p.Timestamp = client.Timestamp(client.SetPrecision(p.Timestamp.Time(), p.Precision)) if len(br.Tags) > 0 { for k, _ := range br.Tags { if p.Tags[k] == "" { @@ -197,7 +258,14 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influ } } } - if _, err := h.server.WriteSeries(br.Database, br.RetentionPolicy, []influxdb.Point{p}); err != nil { + // Need to convert from a client.Point to a influxdb.Point + iPoint := influxdb.Point{ + Name: p.Name, + Tags: p.Tags, + Timestamp: p.Timestamp.Time(), + Values: p.Values, + } + if _, err := h.server.WriteSeries(br.Database, br.RetentionPolicy, []influxdb.Point{iPoint}); err != nil { writeError(influxdb.Result{Err: err}, http.StatusInternalServerError) return } diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 9dda496a7f..099d3c5efb 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/base64" "encoding/json" + "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -12,6 +13,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/httpd" @@ -23,6 +25,117 @@ func init() { influxdb.BcryptCost = 4 } +func TestBatchWrite_UnmarshalEpoch(t *testing.T) { + now := time.Now() + tests := []struct { + name string + epoch int64 + precision string + expected time.Time + }{ + { + name: "nanoseconds", + epoch: now.UnixNano(), + precision: "n", + expected: now, + }, + { + name: "microseconds", + epoch: now.Round(time.Microsecond).UnixNano() / int64(time.Microsecond), + precision: "u", + expected: now.Round(time.Microsecond), + }, + { + name: "milliseconds", + epoch: now.Round(time.Millisecond).UnixNano() / int64(time.Millisecond), + precision: "ms", + expected: now.Round(time.Millisecond), + }, + { + name: "seconds", + epoch: now.Round(time.Second).UnixNano() / int64(time.Second), + precision: "s", + expected: now.Round(time.Second), + }, + { + name: "minutes", + epoch: now.Round(time.Minute).UnixNano() / int64(time.Minute), + precision: "m", + expected: now.Round(time.Minute), + }, + { + name: "hours", + epoch: now.Round(time.Hour).UnixNano() / int64(time.Hour), + precision: "h", + expected: now.Round(time.Hour), + }, + { + name: "max int64", + epoch: 9223372036854775807, + precision: "n", + expected: time.Unix(0, 9223372036854775807), + }, + { + name: "100 years from now", + epoch: now.Add(time.Hour * 24 * 365 * 100).UnixNano(), + precision: "n", + expected: now.Add(time.Hour * 24 * 365 * 100), + }, + } + + for _, test := range tests { + t.Logf("testing %q\n", test.name) + data := []byte(fmt.Sprintf(`{"timestamp": %d, "precision":"%s"}`, test.epoch, test.precision)) + t.Logf("json: %s", string(data)) + var br httpd.BatchWrite + err := json.Unmarshal(data, &br) + if err != nil { + t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) + } + if br.Timestamp != test.expected { + t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, br.Timestamp) + } + } +} + +func TestBatchWrite_UnmarshalRFC(t *testing.T) { + now := time.Now() + tests := []struct { + name string + rfc string + now time.Time + expected time.Time + }{ + { + name: "RFC3339Nano", + rfc: time.RFC3339Nano, + now: now, + expected: now, + }, + { + name: "RFC3339", + rfc: time.RFC3339, + now: now.Round(time.Second), + expected: now.Round(time.Second), + }, + } + + for _, test := range tests { + t.Logf("testing %q\n", test.name) + ts := test.now.Format(test.rfc) + data := []byte(fmt.Sprintf(`{"timestamp": %q}`, ts)) + t.Logf("json: %s", string(data)) + var br httpd.BatchWrite + err := json.Unmarshal(data, &br) + if err != nil { + t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) + } + if br.Timestamp != test.expected { + t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, br.Timestamp) + } + } +} + func TestHandler_Databases(t *testing.T) { srvr := OpenServer(NewMessagingClient()) srvr.CreateDatabase("foo") @@ -1273,6 +1386,8 @@ func TestHandler_serveShowTagValues(t *testing.T) { } } +// batchWrite JSON Unmarshal tests + // Utility functions for this test suite. func MustHTTP(verb, path string, params, headers map[string]string, body string) (int, string) { diff --git a/server_test.go b/server_test.go index a482ee350f..8824ec79be 100644 --- a/server_test.go +++ b/server_test.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "log" "net/url" "os" "reflect" @@ -899,6 +900,39 @@ func TestServer_TagNamesBySeries(t *testing.T) { t.Skip("pending") } func TestServer_TagValues(t *testing.T) { t.Skip("pending") } func TestServer_TagValuesBySeries(t *testing.T) { t.Skip("pending") } +// Point JSON Unmarshal tests + +func TestbatchWrite_UnmarshalEpoch(t *testing.T) { + var ( + now = time.Now() + nanos = now.UnixNano() + micros = nanos / int64(time.Microsecond) + millis = nanos / int64(time.Millisecond) + seconds = nanos / int64(time.Second) + minutes = nanos / int64(time.Minute) + hours = nanos / int64(time.Hour) + ) + + tests := []struct { + name string + epoch int64 + }{ + {name: "nanos", epoch: nanos}, + {name: "micros", epoch: micros}, + {name: "millis", epoch: millis}, + {name: "seconds", epoch: seconds}, + {name: "minutes", epoch: minutes}, + {name: "hours", epoch: hours}, + } + + for _, test := range tests { + json := fmt.Sprintf(`"points": [{timestamp: "%d"}`, test.epoch) + log.Println(json) + t.Fatal("foo") + } + +} + // Server is a wrapping test struct for influxdb.Server. type Server struct { *influxdb.Server