From 8643aa28aaa3156684f0f7972ead5f6368dc6ed1 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 28 Jan 2015 19:30:00 -0700 Subject: [PATCH 01/10] wip starting support for http endpoint taking epoch time --- server.go | 42 ++++++++++++++++++++++++++++++++++++++++++ server_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/server.go b/server.go index 346c70ef21..0527a8898a 100644 --- a/server.go +++ b/server.go @@ -1313,6 +1313,48 @@ type Point struct { Values map[string]interface{} } +// UnmarshalJSON decodes the data into the Point struct +func (p *Point) UnmarshalJSON(b []byte) error { + var normal struct { + Name string + Tags map[string]string + Timestamp time.Time + Values map[string]interface{} + } + var epoch struct { + Name string + Tags map[string]string + Timestamp string `json:",string"` + Precision string // Can be h, m, s, ms, u, n, defaults to s if blank + Values map[string]interface{} + } + + err := json.Unmarshal(b, &epoch) + if err == nil { + // Convert from epoch to time.Time + switch epoch.Precision { + case "h": + case "m": + case "ms": + case "u": + case "n": + default: // defaults to seconds + + } + } + + err = json.Unmarshal(b, &normal) + if err != nil { + p = &Point{ + Name: normal.Name, + Tags: normal.Tags, + Timestamp: normal.Timestamp, + Values: normal.Values, + } + } + return nil +} + // WriteSeries writes series data to the database. // Returns the messaging index the data was written to. func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) { diff --git a/server_test.go b/server_test.go index a482ee350f..b22ffbdd14 100644 --- a/server_test.go +++ b/server_test.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "log" "net/url" "os" "reflect" @@ -899,6 +900,37 @@ func TestServer_TagNamesBySeries(t *testing.T) { t.Skip("pending") } func TestServer_TagValues(t *testing.T) { t.Skip("pending") } func TestServer_TagValuesBySeries(t *testing.T) { t.Skip("pending") } +// Point tests + +func TestPoint_UnmarshalEpoch(t *testing.T) { + var ( + now = time.Now() + nanos = now.UnixNano() + micros = nanos / int64(time.Microsecond) + millis = nanos / int64(time.Millisecond) + seconds = nanos / int64(time.Second) + minutes = nanos / int64(time.Minute) + hours = nanos / int64(time.Hour) + ) + + tests := []struct { + name string + epoch int64 + }{ + {name: "nanos", epoch: nanos}, + {name: "micros", epoch: micros}, + {name: "millis", epoch: millis}, + {name: "seconds", epoch: seconds}, + {name: "minutes", epoch: minutes}, + {name: "hours", epoch: hours}, + } + + for _, test := range tests { + log.Println(test.name) + } + +} + // Server is a wrapping test struct for influxdb.Server. type Server struct { *influxdb.Server From 37aad6fd20c2457de8daee16f459b9863cae3206 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Thu, 29 Jan 2015 11:10:13 -0700 Subject: [PATCH 02/10] client utility function EpochToTime --- client/influxdb.go | 26 ++++++++++++++++++++++++++ client/influxdb_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/client/influxdb.go b/client/influxdb.go index 2a0484ab25..81788458af 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -3,6 +3,7 @@ package client import ( "bytes" "encoding/json" + "fmt" "net/http" "net/url" "time" @@ -130,6 +131,31 @@ func (c *Client) Addr() string { // helper functions +func EpochToTime(epoch int64, precision string) (time.Time, error) { + if precision == "" { + precision = "s" + } + var t time.Time + switch precision { + case "h": + t = time.Unix(0, epoch*int64(time.Hour)) + case "m": + t = time.Unix(0, epoch*int64(time.Minute)) + case "s": + t = time.Unix(0, epoch*int64(time.Second)) + case "ms": + t = time.Unix(0, epoch*int64(time.Millisecond)) + case "u": + t = time.Unix(0, epoch*int64(time.Microsecond)) + case "n": + t = time.Unix(0, epoch) + default: + return time.Time{}, fmt.Errorf("Unknowm precision %q", precision) + } + return t, nil + +} + func detect(values ...string) string { for _, v := range values { if v != "" { diff --git a/client/influxdb_test.go b/client/influxdb_test.go index 6131c8796b..e0cc6f6609 100644 --- a/client/influxdb_test.go +++ b/client/influxdb_test.go @@ -6,6 +6,7 @@ import ( "net/http/httptest" "net/url" "testing" + "time" "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/client" @@ -116,6 +117,35 @@ func TestClient_Write(t *testing.T) { } } +func TestEpochToTime(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + epoch int64 + precision string + expected time.Time + }{ + {name: "nanoseconds", epoch: now.UnixNano(), precision: "n", expected: now}, + {name: "microseconds", epoch: now.Round(time.Microsecond).UnixNano() / int64(time.Microsecond), precision: "u", expected: now.Round(time.Microsecond)}, + {name: "milliseconds", epoch: now.Round(time.Millisecond).UnixNano() / int64(time.Millisecond), precision: "ms", expected: now.Round(time.Millisecond)}, + {name: "seconds", epoch: now.Round(time.Second).UnixNano() / int64(time.Second), precision: "s", expected: now.Round(time.Second)}, + {name: "minutes", epoch: now.Round(time.Minute).UnixNano() / int64(time.Minute), precision: "m", expected: now.Round(time.Minute)}, + {name: "hours", epoch: now.Round(time.Hour).UnixNano() / int64(time.Hour), precision: "h", expected: now.Round(time.Hour)}, + } + + for _, test := range tests { + t.Logf("testing %q\n", test.name) + tm, e := client.EpochToTime(test.epoch, test.precision) + if e != nil { + t.Fatalf("unexpected error: expected %v, actual: %v", nil, e) + } + if tm != test.expected { + t.Fatalf("unexpected time: expected %v, actual %v", test.expected, tm) + } + } +} + // helper functions func emptyTestServer() *httptest.Server { From a724e3b86ced306fc5e76a9d25e8055f35beee49 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Thu, 29 Jan 2015 12:28:13 -0700 Subject: [PATCH 03/10] BatchWrite supports unmarshalling epoch with precision, RFC3339, RFC3339Nano --- httpd/handler.go | 56 ++++++++++++++++++++- httpd/handler_test.go | 112 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 166 insertions(+), 2 deletions(-) diff --git a/httpd/handler.go b/httpd/handler.go index 86f00cf173..ff61222f4d 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -16,6 +16,7 @@ import ( "github.com/bmizerany/pat" "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/client" "github.com/influxdb/influxdb/influxql" ) @@ -140,7 +141,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ httpResults(w, results, pretty) } -type batchWrite struct { +type BatchWrite struct { Points []influxdb.Point `json:"points"` Database string `json:"database"` RetentionPolicy string `json:"retentionPolicy"` @@ -148,9 +149,60 @@ type batchWrite struct { Timestamp time.Time `json:"timestamp"` } +// UnmarshalJSON decodes the data into the batchWrite struct +func (br *BatchWrite) UnmarshalJSON(b []byte) error { + var normal struct { + Points []influxdb.Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp time.Time `json:"timestamp"` + } + var epoch struct { + Points []influxdb.Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp int64 `json:"timestamp"` + Precision string `json:"precision"` + } + + if err := func() error { + var err error + if err = json.Unmarshal(b, &epoch); err != nil { + return err + } + // Convert from epoch to time.Time + ts, err := client.EpochToTime(epoch.Timestamp, epoch.Precision) + log.Println(ts, err) + if err != nil { + return err + } + br.Points = epoch.Points + br.Database = epoch.Database + br.RetentionPolicy = epoch.RetentionPolicy + br.Tags = epoch.Tags + br.Timestamp = ts + return nil + }(); err == nil { + return nil + } + + if err := json.Unmarshal(b, &normal); err != nil { + return err + } + br.Points = normal.Points + br.Database = normal.Database + br.RetentionPolicy = normal.RetentionPolicy + br.Tags = normal.Tags + br.Timestamp = normal.Timestamp + + return nil +} + // serveWrite receives incoming series data and writes it to the database. func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) { - var br batchWrite + var br BatchWrite dec := json.NewDecoder(r.Body) diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 9dda496a7f..4147a8530c 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/base64" "encoding/json" + "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -12,6 +13,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/httpd" @@ -23,6 +25,114 @@ func init() { influxdb.BcryptCost = 4 } +func TestBatchWrite_UnmarshalEpoch(t *testing.T) { + now := time.Now() + tests := []struct { + name string + epoch int64 + precision string + expected time.Time + }{ + { + name: "nanoseconds", + epoch: now.UnixNano(), + precision: "n", + expected: now, + }, + { + name: "microseconds", + epoch: now.Round(time.Microsecond).UnixNano() / int64(time.Microsecond), + precision: "u", + expected: now.Round(time.Microsecond), + }, + { + name: "milliseconds", + epoch: now.Round(time.Millisecond).UnixNano() / int64(time.Millisecond), + precision: "ms", + expected: now.Round(time.Millisecond), + }, + { + name: "seconds", + epoch: now.Round(time.Second).UnixNano() / int64(time.Second), + precision: "s", + expected: now.Round(time.Second), + }, + { + name: "minutes", + epoch: now.Round(time.Minute).UnixNano() / int64(time.Minute), + precision: "m", + expected: now.Round(time.Minute), + }, + { + name: "hours", + epoch: now.Round(time.Hour).UnixNano() / int64(time.Hour), + precision: "h", + expected: now.Round(time.Hour), + }, + { + name: "max int64", + epoch: 9223372036854775807, + precision: "n", + expected: time.Unix(0, 9223372036854775807), + }, + { + name: "100 years from now", + epoch: now.Add(time.Hour * 24 * 365 * 100).UnixNano(), + precision: "n", + expected: now.Add(time.Hour * 24 * 365 * 100), + }, + } + + for _, test := range tests { + t.Logf("testing %q\n", test.name) + data := []byte(fmt.Sprintf(`{"timestamp": %d, "precision":"%s"}`, test.epoch, test.precision)) + t.Logf("json: %s", string(data)) + var br httpd.BatchWrite + err := json.Unmarshal(data, &br) + if err != nil { + t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) + } + if br.Timestamp != test.expected { + t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, br.Timestamp) + } + } +} + +func TestBatchWrite_UnmarshalRFC(t *testing.T) { + now := time.Now() + tests := []struct { + name string + rfc string + expected time.Time + }{ + { + name: "RFC3339Nano", + rfc: time.RFC3339Nano, + expected: now, + }, + { + name: "RFC3339", + rfc: time.RFC3339, + expected: now.Round(time.Second), + }, + } + + for _, test := range tests { + t.Logf("testing %q\n", test.name) + ts := now.Format(test.rfc) + data := []byte(fmt.Sprintf(`{"timestamp": %q}`, ts)) + t.Logf("json: %s", string(data)) + var br httpd.BatchWrite + err := json.Unmarshal(data, &br) + if err != nil { + t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) + } + if br.Timestamp != test.expected { + t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, br.Timestamp) + } + } +} + func TestHandler_Databases(t *testing.T) { srvr := OpenServer(NewMessagingClient()) srvr.CreateDatabase("foo") @@ -1273,6 +1383,8 @@ func TestHandler_serveShowTagValues(t *testing.T) { } } +// batchWrite JSON Unmarshal tests + // Utility functions for this test suite. func MustHTTP(verb, path string, params, headers map[string]string, body string) (int, string) { From 42630c38dce0e679a77dd4ef239fb51acd4aed63 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Thu, 29 Jan 2015 12:30:41 -0700 Subject: [PATCH 04/10] removing log output --- httpd/handler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/httpd/handler.go b/httpd/handler.go index ff61222f4d..20b446bd13 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -174,7 +174,6 @@ func (br *BatchWrite) UnmarshalJSON(b []byte) error { } // Convert from epoch to time.Time ts, err := client.EpochToTime(epoch.Timestamp, epoch.Precision) - log.Println(ts, err) if err != nil { return err } From acbaf9aae472edcc59d185e4dbd8d8079a98eb32 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Thu, 29 Jan 2015 12:32:31 -0700 Subject: [PATCH 05/10] stabalize tests for time --- httpd/handler_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 4147a8530c..099d3c5efb 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -103,23 +103,26 @@ func TestBatchWrite_UnmarshalRFC(t *testing.T) { tests := []struct { name string rfc string + now time.Time expected time.Time }{ { name: "RFC3339Nano", rfc: time.RFC3339Nano, + now: now, expected: now, }, { name: "RFC3339", rfc: time.RFC3339, + now: now.Round(time.Second), expected: now.Round(time.Second), }, } for _, test := range tests { t.Logf("testing %q\n", test.name) - ts := now.Format(test.rfc) + ts := test.now.Format(test.rfc) data := []byte(fmt.Sprintf(`{"timestamp": %q}`, ts)) t.Logf("json: %s", string(data)) var br httpd.BatchWrite From 4b84a2d1749d14ba3ee8630671928fe16a4acc59 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Thu, 29 Jan 2015 14:07:43 -0700 Subject: [PATCH 06/10] Point/Results duplicated in Client package. Custom marshaling for timestamps --- client/influxdb.go | 167 +++++++++++++++++++++++++++++++++++++--- client/influxdb_test.go | 112 +++++++++++++++++++++++++++ server.go | 42 ---------- server_test.go | 8 +- 4 files changed, 275 insertions(+), 54 deletions(-) diff --git a/client/influxdb.go b/client/influxdb.go index 81788458af..8ccf41dcf0 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -3,12 +3,13 @@ package client import ( "bytes" "encoding/json" + "errors" "fmt" "net/http" "net/url" "time" - "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/influxql" ) type Config struct { @@ -32,7 +33,7 @@ type Query struct { type Write struct { Database string RetentionPolicy string - Points []influxdb.Point + Points []Point } func NewClient(c Config) (*Client, error) { @@ -45,7 +46,7 @@ func NewClient(c Config) (*Client, error) { return &client, nil } -func (c *Client) Query(q Query) (*influxdb.Results, error) { +func (c *Client) Query(q Query) (*Results, error) { u := c.url u.Path = "query" @@ -60,7 +61,7 @@ func (c *Client) Query(q Query) (*influxdb.Results, error) { } defer resp.Body.Close() - var results influxdb.Results + var results Results err = json.NewDecoder(resp.Body).Decode(&results) if err != nil { return nil, err @@ -68,12 +69,12 @@ func (c *Client) Query(q Query) (*influxdb.Results, error) { return &results, nil } -func (c *Client) Write(writes ...Write) (*influxdb.Results, error) { +func (c *Client) Write(writes ...Write) (*Results, error) { c.url.Path = "write" type data struct { - Points []influxdb.Point `json:"points"` - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` + Points []Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` } d := []data{} @@ -90,7 +91,7 @@ func (c *Client) Write(writes ...Write) (*influxdb.Results, error) { } defer resp.Body.Close() - var results influxdb.Results + var results Results err = json.NewDecoder(resp.Body).Decode(&results) if err != nil { return nil, err @@ -110,6 +111,154 @@ func (c *Client) Ping() (time.Duration, string, error) { return time.Since(now), version, nil } +// Structs + +// Result represents a resultset returned from a single statement. +type Result struct { + Rows []*influxql.Row + Err error +} + +// MarshalJSON encodes the result into JSON. +func (r *Result) MarshalJSON() ([]byte, error) { + // Define a struct that outputs "error" as a string. + var o struct { + Rows []*influxql.Row `json:"rows,omitempty"` + Err string `json:"error,omitempty"` + } + + // Copy fields to output struct. + o.Rows = r.Rows + if r.Err != nil { + o.Err = r.Err.Error() + } + + return json.Marshal(&o) +} + +// UnmarshalJSON decodes the data into the Result struct +func (r *Result) UnmarshalJSON(b []byte) error { + var o struct { + Rows []*influxql.Row `json:"rows,omitempty"` + Err string `json:"error,omitempty"` + } + + err := json.Unmarshal(b, &o) + if err != nil { + return err + } + r.Rows = o.Rows + if o.Err != "" { + r.Err = errors.New(o.Err) + } + return nil +} + +// Results represents a list of statement results. +type Results struct { + Results []*Result + Err error +} + +func (r Results) MarshalJSON() ([]byte, error) { + // Define a struct that outputs "error" as a string. + var o struct { + Results []*Result `json:"results,omitempty"` + Err string `json:"error,omitempty"` + } + + // Copy fields to output struct. + o.Results = r.Results + if r.Err != nil { + o.Err = r.Err.Error() + } + + return json.Marshal(&o) +} + +// UnmarshalJSON decodes the data into the Results struct +func (r *Results) UnmarshalJSON(b []byte) error { + var o struct { + Results []*Result `json:"results,omitempty"` + Err string `json:"error,omitempty"` + } + + err := json.Unmarshal(b, &o) + if err != nil { + return err + } + r.Results = o.Results + if o.Err != "" { + r.Err = errors.New(o.Err) + } + return nil +} + +// Error returns the first error from any statement. +// Returns nil if no errors occurred on any statements. +func (a Results) Error() error { + for _, r := range a.Results { + if r.Err != nil { + return r.Err + } + } + return nil +} + +// Point defines the values that will be written to the database +type Point struct { + Name string + Tags map[string]string + Timestamp time.Time + Values map[string]interface{} +} + +// UnmarshalJSON decodes the data into the Point struct +func (p *Point) UnmarshalJSON(b []byte) error { + var normal struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Timestamp time.Time `json:"timestamp"` + Values map[string]interface{} `json:"values"` + } + var epoch struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Timestamp int64 `json:"timestamp"` + Precision string `json:"precision"` + Values map[string]interface{} `json:"values"` + } + + if err := func() error { + var err error + if err = json.Unmarshal(b, &epoch); err != nil { + return err + } + // Convert from epoch to time.Time + ts, err := EpochToTime(epoch.Timestamp, epoch.Precision) + if err != nil { + return err + } + p.Name = epoch.Name + p.Tags = epoch.Tags + p.Timestamp = ts + p.Values = epoch.Values + return nil + }(); err == nil { + return nil + } + + if err := json.Unmarshal(b, &normal); err != nil { + return err + } + p.Name = normal.Name + p.Tags = normal.Tags + p.Timestamp = normal.Timestamp + p.Values = normal.Values + + return nil +} + // utility functions func (c *Client) Addr() string { diff --git a/client/influxdb_test.go b/client/influxdb_test.go index e0cc6f6609..775aa5a7f8 100644 --- a/client/influxdb_test.go +++ b/client/influxdb_test.go @@ -2,6 +2,7 @@ package client_test import ( "encoding/json" + "fmt" "net/http" "net/http/httptest" "net/url" @@ -117,6 +118,117 @@ func TestClient_Write(t *testing.T) { } } +func TestPoint_UnmarshalEpoch(t *testing.T) { + now := time.Now() + tests := []struct { + name string + epoch int64 + precision string + expected time.Time + }{ + { + name: "nanoseconds", + epoch: now.UnixNano(), + precision: "n", + expected: now, + }, + { + name: "microseconds", + epoch: now.Round(time.Microsecond).UnixNano() / int64(time.Microsecond), + precision: "u", + expected: now.Round(time.Microsecond), + }, + { + name: "milliseconds", + epoch: now.Round(time.Millisecond).UnixNano() / int64(time.Millisecond), + precision: "ms", + expected: now.Round(time.Millisecond), + }, + { + name: "seconds", + epoch: now.Round(time.Second).UnixNano() / int64(time.Second), + precision: "s", + expected: now.Round(time.Second), + }, + { + name: "minutes", + epoch: now.Round(time.Minute).UnixNano() / int64(time.Minute), + precision: "m", + expected: now.Round(time.Minute), + }, + { + name: "hours", + epoch: now.Round(time.Hour).UnixNano() / int64(time.Hour), + precision: "h", + expected: now.Round(time.Hour), + }, + { + name: "max int64", + epoch: 9223372036854775807, + precision: "n", + expected: time.Unix(0, 9223372036854775807), + }, + { + name: "100 years from now", + epoch: now.Add(time.Hour * 24 * 365 * 100).UnixNano(), + precision: "n", + expected: now.Add(time.Hour * 24 * 365 * 100), + }, + } + + for _, test := range tests { + t.Logf("testing %q\n", test.name) + data := []byte(fmt.Sprintf(`{"timestamp": %d, "precision":"%s"}`, test.epoch, test.precision)) + t.Logf("json: %s", string(data)) + var p client.Point + err := json.Unmarshal(data, &p) + if err != nil { + t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) + } + if p.Timestamp != test.expected { + t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp) + } + } +} + +func TestPoint_UnmarshalRFC(t *testing.T) { + now := time.Now() + tests := []struct { + name string + rfc string + now time.Time + expected time.Time + }{ + { + name: "RFC3339Nano", + rfc: time.RFC3339Nano, + now: now, + expected: now, + }, + { + name: "RFC3339", + rfc: time.RFC3339, + now: now.Round(time.Second), + expected: now.Round(time.Second), + }, + } + + for _, test := range tests { + t.Logf("testing %q\n", test.name) + ts := test.now.Format(test.rfc) + data := []byte(fmt.Sprintf(`{"timestamp": %q}`, ts)) + t.Logf("json: %s", string(data)) + var p client.Point + err := json.Unmarshal(data, &p) + if err != nil { + t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) + } + if p.Timestamp != test.expected { + t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp) + } + } +} + func TestEpochToTime(t *testing.T) { now := time.Now() diff --git a/server.go b/server.go index 0527a8898a..346c70ef21 100644 --- a/server.go +++ b/server.go @@ -1313,48 +1313,6 @@ type Point struct { Values map[string]interface{} } -// UnmarshalJSON decodes the data into the Point struct -func (p *Point) UnmarshalJSON(b []byte) error { - var normal struct { - Name string - Tags map[string]string - Timestamp time.Time - Values map[string]interface{} - } - var epoch struct { - Name string - Tags map[string]string - Timestamp string `json:",string"` - Precision string // Can be h, m, s, ms, u, n, defaults to s if blank - Values map[string]interface{} - } - - err := json.Unmarshal(b, &epoch) - if err == nil { - // Convert from epoch to time.Time - switch epoch.Precision { - case "h": - case "m": - case "ms": - case "u": - case "n": - default: // defaults to seconds - - } - } - - err = json.Unmarshal(b, &normal) - if err != nil { - p = &Point{ - Name: normal.Name, - Tags: normal.Tags, - Timestamp: normal.Timestamp, - Values: normal.Values, - } - } - return nil -} - // WriteSeries writes series data to the database. // Returns the messaging index the data was written to. func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) { diff --git a/server_test.go b/server_test.go index b22ffbdd14..8824ec79be 100644 --- a/server_test.go +++ b/server_test.go @@ -900,9 +900,9 @@ func TestServer_TagNamesBySeries(t *testing.T) { t.Skip("pending") } func TestServer_TagValues(t *testing.T) { t.Skip("pending") } func TestServer_TagValuesBySeries(t *testing.T) { t.Skip("pending") } -// Point tests +// Point JSON Unmarshal tests -func TestPoint_UnmarshalEpoch(t *testing.T) { +func TestbatchWrite_UnmarshalEpoch(t *testing.T) { var ( now = time.Now() nanos = now.UnixNano() @@ -926,7 +926,9 @@ func TestPoint_UnmarshalEpoch(t *testing.T) { } for _, test := range tests { - log.Println(test.name) + json := fmt.Sprintf(`"points": [{timestamp: "%d"}`, test.epoch) + log.Println(json) + t.Fatal("foo") } } From 88759a7ecc537fd87aae6da1fed4fca318a257d6 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Thu, 29 Jan 2015 15:16:05 -0700 Subject: [PATCH 07/10] time should marshal into UTC and nanoseconds --- client/influxdb.go | 24 ++++++++++++++++++------ client/influxdb_test.go | 4 ++-- httpd/handler.go | 19 +++++++++++++------ 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/client/influxdb.go b/client/influxdb.go index 8ccf41dcf0..dc0bdc80ac 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -205,12 +205,24 @@ func (a Results) Error() error { return nil } +type Timestamp time.Time + +func (t Timestamp) Time() time.Time { + return time.Time(t) +} + +func (t Timestamp) MarshalJSON() ([]byte, error) { + // Always send back in UTC with nanoseconds + s := t.Time().UTC().Format(time.RFC3339Nano) + return []byte(`"` + s + `"`), nil +} + // Point defines the values that will be written to the database type Point struct { - Name string - Tags map[string]string - Timestamp time.Time - Values map[string]interface{} + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Timestamp Timestamp `json:"timestamp"` + Values map[string]interface{} `json:"values"` } // UnmarshalJSON decodes the data into the Point struct @@ -241,7 +253,7 @@ func (p *Point) UnmarshalJSON(b []byte) error { } p.Name = epoch.Name p.Tags = epoch.Tags - p.Timestamp = ts + p.Timestamp = Timestamp(ts) p.Values = epoch.Values return nil }(); err == nil { @@ -253,7 +265,7 @@ func (p *Point) UnmarshalJSON(b []byte) error { } p.Name = normal.Name p.Tags = normal.Tags - p.Timestamp = normal.Timestamp + p.Timestamp = Timestamp(normal.Timestamp) p.Values = normal.Values return nil diff --git a/client/influxdb_test.go b/client/influxdb_test.go index 775aa5a7f8..70c76438a5 100644 --- a/client/influxdb_test.go +++ b/client/influxdb_test.go @@ -185,7 +185,7 @@ func TestPoint_UnmarshalEpoch(t *testing.T) { if err != nil { t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) } - if p.Timestamp != test.expected { + if p.Timestamp.Time() != test.expected { t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp) } } @@ -223,7 +223,7 @@ func TestPoint_UnmarshalRFC(t *testing.T) { if err != nil { t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) } - if p.Timestamp != test.expected { + if p.Timestamp.Time() != test.expected { t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp) } } diff --git a/httpd/handler.go b/httpd/handler.go index 20b446bd13..0fb751e55e 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -142,7 +142,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ } type BatchWrite struct { - Points []influxdb.Point `json:"points"` + Points []client.Point `json:"points"` Database string `json:"database"` RetentionPolicy string `json:"retentionPolicy"` Tags map[string]string `json:"tags"` @@ -152,14 +152,14 @@ type BatchWrite struct { // UnmarshalJSON decodes the data into the batchWrite struct func (br *BatchWrite) UnmarshalJSON(b []byte) error { var normal struct { - Points []influxdb.Point `json:"points"` + Points []client.Point `json:"points"` Database string `json:"database"` RetentionPolicy string `json:"retentionPolicy"` Tags map[string]string `json:"tags"` Timestamp time.Time `json:"timestamp"` } var epoch struct { - Points []influxdb.Point `json:"points"` + Points []client.Point `json:"points"` Database string `json:"database"` RetentionPolicy string `json:"retentionPolicy"` Tags map[string]string `json:"tags"` @@ -238,8 +238,8 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influ } for _, p := range br.Points { - if p.Timestamp.IsZero() { - p.Timestamp = br.Timestamp + if p.Timestamp.Time().IsZero() { + p.Timestamp = client.Timestamp(br.Timestamp) } if len(br.Tags) > 0 { for k, _ := range br.Tags { @@ -248,7 +248,14 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influ } } } - if _, err := h.server.WriteSeries(br.Database, br.RetentionPolicy, []influxdb.Point{p}); err != nil { + // Need to convert from a client.Point to a influxdb.Point + iPoint := influxdb.Point{ + Name: p.Name, + Tags: p.Tags, + Timestamp: p.Timestamp.Time(), + Values: p.Values, + } + if _, err := h.server.WriteSeries(br.Database, br.RetentionPolicy, []influxdb.Point{iPoint}); err != nil { writeError(influxdb.Result{Err: err}, http.StatusInternalServerError) return } From b6725501f46d0aa318d329f0c6d94e00063ab6ac Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Thu, 29 Jan 2015 16:22:43 -0700 Subject: [PATCH 08/10] Rounding precision on writing --- client/influxdb.go | 20 ++++++++++++++++++++ httpd/handler.go | 2 ++ 2 files changed, 22 insertions(+) diff --git a/client/influxdb.go b/client/influxdb.go index dc0bdc80ac..cfb8ba765c 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -231,6 +231,7 @@ func (p *Point) UnmarshalJSON(b []byte) error { Name string `json:"name"` Tags map[string]string `json:"tags"` Timestamp time.Time `json:"timestamp"` + Precision string `json:"precision"` Values map[string]interface{} `json:"values"` } var epoch struct { @@ -263,6 +264,7 @@ func (p *Point) UnmarshalJSON(b []byte) error { if err := json.Unmarshal(b, &normal); err != nil { return err } + normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision) p.Name = normal.Name p.Tags = normal.Tags p.Timestamp = Timestamp(normal.Timestamp) @@ -317,6 +319,24 @@ func EpochToTime(epoch int64, precision string) (time.Time, error) { } +// SetPrecision will round a time to the specified precision +func SetPrecision(t time.Time, precision string) time.Time { + switch precision { + case "n": + case "u": + return t.Round(time.Microsecond) + case "ms": + return t.Round(time.Millisecond) + case "s": + return t.Round(time.Second) + case "m": + return t.Round(time.Minute) + case "h": + return t.Round(time.Hour) + } + return t +} + func detect(values ...string) string { for _, v := range values { if v != "" { diff --git a/httpd/handler.go b/httpd/handler.go index 0fb751e55e..445874c890 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -157,6 +157,7 @@ func (br *BatchWrite) UnmarshalJSON(b []byte) error { RetentionPolicy string `json:"retentionPolicy"` Tags map[string]string `json:"tags"` Timestamp time.Time `json:"timestamp"` + Precision string `json:"precision"` } var epoch struct { Points []client.Point `json:"points"` @@ -190,6 +191,7 @@ func (br *BatchWrite) UnmarshalJSON(b []byte) error { if err := json.Unmarshal(b, &normal); err != nil { return err } + normal.Timestamp = client.SetPrecision(normal.Timestamp, normal.Precision) br.Points = normal.Points br.Database = normal.Database br.RetentionPolicy = normal.RetentionPolicy From b6ceca4c1c0f2415f638b4615ff6746e02e8bba9 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Thu, 29 Jan 2015 16:33:31 -0700 Subject: [PATCH 09/10] godoc comments and misc formatting --- client/influxdb.go | 18 ++++-------------- httpd/handler.go | 1 + 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/client/influxdb.go b/client/influxdb.go index cfb8ba765c..2b22d2de33 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -205,12 +205,15 @@ func (a Results) Error() error { return nil } +// Timestamp is a custom type so we marshal JSON properly into UTC nanosecond time type Timestamp time.Time +// Time returns the time represented by the Timestamp func (t Timestamp) Time() time.Time { return time.Time(t) } +// MarshalJSON returns time in UTC with nanoseconds func (t Timestamp) MarshalJSON() ([]byte, error) { // Always send back in UTC with nanoseconds s := t.Time().UTC().Format(time.RFC3339Nano) @@ -279,21 +282,9 @@ func (c *Client) Addr() string { return c.url.String() } -//func (c *Client) urlFor(path string) (*url.URL, error) { -//var u *url.URL -//u, err := url.Parse(fmt.Sprintf("%s%s", c.addr, path)) -//if err != nil { -//return nil, err -//} -//if c.username != "" { -//u.User = url.UserPassword(c.username, c.password) -//} -//u.Scheme = "http" -//return u, nil -//} - // helper functions +// EpochToTime takes a unix epoch time and uses precision to return back a time.Time func EpochToTime(epoch int64, precision string) (time.Time, error) { if precision == "" { precision = "s" @@ -316,7 +307,6 @@ func EpochToTime(epoch int64, precision string) (time.Time, error) { return time.Time{}, fmt.Errorf("Unknowm precision %q", precision) } return t, nil - } // SetPrecision will round a time to the specified precision diff --git a/httpd/handler.go b/httpd/handler.go index 445874c890..fff9b5a544 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -141,6 +141,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ httpResults(w, results, pretty) } +// BatchWrite is used to send batch write data to the http /write endpoint type BatchWrite struct { Points []client.Point `json:"points"` Database string `json:"database"` From c9e4eea836c5370c31cd5b7778c4f4578213e017 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Thu, 29 Jan 2015 16:44:10 -0700 Subject: [PATCH 10/10] inherit precision from top level object when writing points --- client/influxdb.go | 3 +++ httpd/handler.go | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/client/influxdb.go b/client/influxdb.go index 2b22d2de33..e46137a2fd 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -226,6 +226,7 @@ type Point struct { Tags map[string]string `json:"tags"` Timestamp Timestamp `json:"timestamp"` Values map[string]interface{} `json:"values"` + Precision string `json:"precision"` } // UnmarshalJSON decodes the data into the Point struct @@ -258,6 +259,7 @@ func (p *Point) UnmarshalJSON(b []byte) error { p.Name = epoch.Name p.Tags = epoch.Tags p.Timestamp = Timestamp(ts) + p.Precision = epoch.Precision p.Values = epoch.Values return nil }(); err == nil { @@ -271,6 +273,7 @@ func (p *Point) UnmarshalJSON(b []byte) error { p.Name = normal.Name p.Tags = normal.Tags p.Timestamp = Timestamp(normal.Timestamp) + p.Precision = normal.Precision p.Values = normal.Values return nil diff --git a/httpd/handler.go b/httpd/handler.go index fff9b5a544..87895f1ac3 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -148,6 +148,7 @@ type BatchWrite struct { RetentionPolicy string `json:"retentionPolicy"` Tags map[string]string `json:"tags"` Timestamp time.Time `json:"timestamp"` + Precision string `json:"precision"` } // UnmarshalJSON decodes the data into the batchWrite struct @@ -184,6 +185,7 @@ func (br *BatchWrite) UnmarshalJSON(b []byte) error { br.RetentionPolicy = epoch.RetentionPolicy br.Tags = epoch.Tags br.Timestamp = ts + br.Precision = epoch.Precision return nil }(); err == nil { return nil @@ -198,6 +200,7 @@ func (br *BatchWrite) UnmarshalJSON(b []byte) error { br.RetentionPolicy = normal.RetentionPolicy br.Tags = normal.Tags br.Timestamp = normal.Timestamp + br.Precision = normal.Precision return nil } @@ -244,6 +247,10 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influ if p.Timestamp.Time().IsZero() { p.Timestamp = client.Timestamp(br.Timestamp) } + if p.Precision == "" && br.Precision != "" { + p.Precision = br.Precision + } + p.Timestamp = client.Timestamp(client.SetPrecision(p.Timestamp.Time(), p.Precision)) if len(br.Tags) > 0 { for k, _ := range br.Tags { if p.Tags[k] == "" {