From bda968552618253a9ef43f9c1878fbf7cfe8447a Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Fri, 6 Mar 2015 10:29:32 -0700 Subject: [PATCH] Use BatchPoints for writing from client library --- client/influxdb.go | 92 ++++++++++++++++++++++++++------- client/influxdb_test.go | 47 +++++++++++++++-- httpd/handler.go | 3 +- httpd/handler_test.go | 5 +- influxdb.go | 109 ---------------------------------------- influxdb_test.go | 46 ----------------- server.go | 40 +++++++++++++++ udp/udp.go | 6 ++- 8 files changed, 166 insertions(+), 182 deletions(-) diff --git a/client/influxdb.go b/client/influxdb.go index 7827d45808..08ec39e08c 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -12,6 +12,11 @@ import ( "github.com/influxdb/influxdb/influxql" ) +type Query struct { + Command string + Database string +} + type Config struct { URL url.URL Username string @@ -27,17 +32,6 @@ type Client struct { userAgent string } -type Query struct { - Command string - Database string -} - -type Write struct { - Database string - RetentionPolicy string - Points []Point -} - func NewClient(c Config) (*Client, error) { client := Client{ url: c.URL, @@ -49,6 +43,73 @@ func NewClient(c Config) (*Client, error) { return &client, nil } +// BatchPoints is used to send batched data in a single write. +type BatchPoints struct { + Points []Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp time.Time `json:"timestamp"` + Precision string `json:"precision"` +} + +// UnmarshalJSON decodes the data into the BatchPoints struct +func (bp *BatchPoints) UnmarshalJSON(b []byte) error { + var normal struct { + Points []Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp time.Time `json:"timestamp"` + Precision string `json:"precision"` + } + var epoch struct { + Points []Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp *int64 `json:"timestamp"` + Precision string `json:"precision"` + } + + if err := func() error { + var err error + if err = json.Unmarshal(b, &epoch); err != nil { + return err + } + // Convert from epoch to time.Time + var ts time.Time + if epoch.Timestamp != nil { + ts, err = EpochToTime(*epoch.Timestamp, epoch.Precision) + if err != nil { + return err + } + } + bp.Points = epoch.Points + bp.Database = epoch.Database + bp.RetentionPolicy = epoch.RetentionPolicy + bp.Tags = epoch.Tags + bp.Timestamp = ts + bp.Precision = epoch.Precision + return nil + }(); err == nil { + return nil + } + + if err := json.Unmarshal(b, &normal); err != nil { + return err + } + normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision) + bp.Points = normal.Points + bp.Database = normal.Database + bp.RetentionPolicy = normal.RetentionPolicy + bp.Tags = normal.Tags + bp.Timestamp = normal.Timestamp + bp.Precision = normal.Precision + + return nil +} + func (c *Client) Query(q Query) (*Results, error) { u := c.url @@ -81,7 +142,7 @@ func (c *Client) Query(q Query) (*Results, error) { return &results, nil } -func (c *Client) Write(writes ...Write) (*Results, error) { +func (c *Client) Write(bp BatchPoints) (*Results, error) { c.url.Path = "write" type data struct { Points []Point `json:"points"` @@ -89,13 +150,8 @@ func (c *Client) Write(writes ...Write) (*Results, error) { RetentionPolicy string `json:"retentionPolicy"` } - d := []data{} - for _, write := range writes { - d = append(d, data{Points: write.Points, Database: write.Database, RetentionPolicy: write.RetentionPolicy}) - } - b := []byte{} - err := json.Unmarshal(b, &d) + err := json.Unmarshal(b, &bp) req, err := http.NewRequest("POST", c.url.String(), bytes.NewBuffer(b)) if err != nil { diff --git a/client/influxdb_test.go b/client/influxdb_test.go index e0efc73444..9430891b0c 100644 --- a/client/influxdb_test.go +++ b/client/influxdb_test.go @@ -111,8 +111,8 @@ func TestClient_Write(t *testing.T) { t.Fatalf("unexpected error. expected %v, actual %v", nil, err) } - write := client.Write{} - _, err = c.Write(write) + bp := client.BatchPoints{} + _, err = c.Write(bp) if err != nil { t.Fatalf("unexpected error. expected %v, actual %v", nil, err) } @@ -172,8 +172,8 @@ func TestClient_UserAgent(t *testing.T) { } receivedUserAgent = "" - write := client.Write{} - _, err = c.Write(write) + bp := client.BatchPoints{} + _, err = c.Write(bp) if err != nil { t.Fatalf("unexpected error. expected %v, actual %v", nil, err) } @@ -340,3 +340,42 @@ func emptyTestServer() *httptest.Server { return })) } + +// Ensure that data with epoch timestamps can be decoded. +func TestBatchPoints_Normal(t *testing.T) { + var bp client.BatchPoints + data := []byte(` +{ + "database": "foo", + "retentionPolicy": "bar", + "points": [ + { + "name": "cpu", + "tags": { + "host": "server01" + }, + "timestamp": 14244733039069373, + "precision": "n", + "values": { + "value": 4541770385657154000 + } + }, + { + "name": "cpu", + "tags": { + "host": "server01" + }, + "timestamp": 14244733039069380, + "precision": "n", + "values": { + "value": 7199311900554737000 + } + } + ] +} +`) + + if err := json.Unmarshal(data, &bp); err != nil { + t.Errorf("failed to unmarshal nanosecond data: %s", err.Error()) + } +} diff --git a/httpd/handler.go b/httpd/handler.go index 83c27cefd2..dc443e1c86 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -22,6 +22,7 @@ import ( "github.com/bmizerany/pat" "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/client" "github.com/influxdb/influxdb/influxql" ) @@ -176,7 +177,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ // serveWrite receives incoming series data and writes it to the database. func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) { - var bp influxdb.BatchPoints + var bp client.BatchPoints var dec *json.Decoder if h.WriteTrace { diff --git a/httpd/handler_test.go b/httpd/handler_test.go index f975650913..9080d5332d 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -16,6 +16,7 @@ import ( "time" "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/client" "github.com/influxdb/influxdb/httpd" "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/messaging" @@ -87,7 +88,7 @@ func TestBatchWrite_UnmarshalEpoch(t *testing.T) { t.Logf("testing %q\n", test.name) data := []byte(fmt.Sprintf(`{"timestamp": %d, "precision":"%s"}`, test.epoch, test.precision)) t.Logf("json: %s", string(data)) - var bp influxdb.BatchPoints + var bp client.BatchPoints err := json.Unmarshal(data, &bp) if err != nil { t.Fatalf("unexpected error. expected: %v, actual: %v", nil, err) @@ -125,7 +126,7 @@ func TestBatchWrite_UnmarshalRFC(t *testing.T) { ts := test.now.Format(test.rfc) data := []byte(fmt.Sprintf(`{"timestamp": %q}`, ts)) t.Logf("json: %s", string(data)) - var bp influxdb.BatchPoints + var bp client.BatchPoints err := json.Unmarshal(data, &bp) if err != nil { t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) diff --git a/influxdb.go b/influxdb.go index 17be925667..918b8a547e 100644 --- a/influxdb.go +++ b/influxdb.go @@ -5,9 +5,6 @@ import ( "errors" "fmt" "os" - "time" - - "github.com/influxdb/influxdb/client" ) var ( @@ -130,112 +127,6 @@ var ( ErrContinuousQueryExists = errors.New("continuous query already exists") ) -// BatchPoints is used to send batched data in a single write. -type BatchPoints struct { - Points []client.Point `json:"points"` - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` - Tags map[string]string `json:"tags"` - Timestamp time.Time `json:"timestamp"` - Precision string `json:"precision"` -} - -// UnmarshalJSON decodes the data into the BatchPoints struct -func (bp *BatchPoints) UnmarshalJSON(b []byte) error { - var normal struct { - Points []client.Point `json:"points"` - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` - Tags map[string]string `json:"tags"` - Timestamp time.Time `json:"timestamp"` - Precision string `json:"precision"` - } - var epoch struct { - Points []client.Point `json:"points"` - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` - Tags map[string]string `json:"tags"` - Timestamp *int64 `json:"timestamp"` - Precision string `json:"precision"` - } - - if err := func() error { - var err error - if err = json.Unmarshal(b, &epoch); err != nil { - return err - } - // Convert from epoch to time.Time - var ts time.Time - if epoch.Timestamp != nil { - ts, err = client.EpochToTime(*epoch.Timestamp, epoch.Precision) - if err != nil { - return err - } - } - bp.Points = epoch.Points - bp.Database = epoch.Database - bp.RetentionPolicy = epoch.RetentionPolicy - bp.Tags = epoch.Tags - bp.Timestamp = ts - bp.Precision = epoch.Precision - return nil - }(); err == nil { - return nil - } - - if err := json.Unmarshal(b, &normal); err != nil { - return err - } - normal.Timestamp = client.SetPrecision(normal.Timestamp, normal.Precision) - bp.Points = normal.Points - bp.Database = normal.Database - bp.RetentionPolicy = normal.RetentionPolicy - bp.Tags = normal.Tags - bp.Timestamp = normal.Timestamp - bp.Precision = normal.Precision - - return nil -} - -// NormalizeBatchPoints returns a slice of Points, created by populating individual -// points within the batch, which do not have timestamps or tags, with the top-level -// values. -func NormalizeBatchPoints(bp BatchPoints) ([]Point, error) { - points := []Point{} - for _, p := range bp.Points { - if p.Timestamp.Time().IsZero() { - if bp.Timestamp.IsZero() { - p.Timestamp = client.Timestamp(time.Now()) - } else { - p.Timestamp = client.Timestamp(bp.Timestamp) - } - } - if p.Precision == "" && bp.Precision != "" { - p.Precision = bp.Precision - } - p.Timestamp = client.Timestamp(client.SetPrecision(p.Timestamp.Time(), p.Precision)) - if len(bp.Tags) > 0 { - if p.Tags == nil { - p.Tags = make(map[string]string) - } - for k := range bp.Tags { - if p.Tags[k] == "" { - p.Tags[k] = bp.Tags[k] - } - } - } - // Need to convert from a client.Point to a influxdb.Point - points = append(points, Point{ - Name: p.Name, - Tags: p.Tags, - Timestamp: p.Timestamp.Time(), - Fields: p.Fields, - }) - } - - return points, nil -} - // ErrAuthorize represents an authorization error. type ErrAuthorize struct { text string diff --git a/influxdb_test.go b/influxdb_test.go index bba4ba8b24..daba8c9362 100644 --- a/influxdb_test.go +++ b/influxdb_test.go @@ -1,47 +1 @@ package influxdb_test - -import ( - "encoding/json" - "testing" - - "github.com/influxdb/influxdb" -) - -// Ensure that data with epoch timestamps can be decoded. -func TestBatchPoints_Normal(t *testing.T) { - var p influxdb.BatchPoints - data := []byte(` -{ - "database": "foo", - "retentionPolicy": "bar", - "points": [ - { - "name": "cpu", - "tags": { - "host": "server01" - }, - "timestamp": 14244733039069373, - "precision": "n", - "values": { - "value": 4541770385657154000 - } - }, - { - "name": "cpu", - "tags": { - "host": "server01" - }, - "timestamp": 14244733039069380, - "precision": "n", - "values": { - "value": 7199311900554737000 - } - } - ] -} -`) - - if err := json.Unmarshal(data, &p); err != nil { - t.Errorf("failed to unmarshal nanosecond data: %s", err.Error()) - } -} diff --git a/server.go b/server.go index 5e33d776fb..f458f55fc9 100644 --- a/server.go +++ b/server.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/influxdb/influxdb/client" "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/messaging" "golang.org/x/crypto/bcrypt" @@ -1367,6 +1368,45 @@ type Point struct { Fields map[string]interface{} } +// NormalizeBatchPoints returns a slice of Points, created by populating individual +// points within the batch, which do not have timestamps or tags, with the top-level +// values. +func NormalizeBatchPoints(bp client.BatchPoints) ([]Point, error) { + points := []Point{} + for _, p := range bp.Points { + if p.Timestamp.Time().IsZero() { + if bp.Timestamp.IsZero() { + p.Timestamp = client.Timestamp(time.Now()) + } else { + p.Timestamp = client.Timestamp(bp.Timestamp) + } + } + if p.Precision == "" && bp.Precision != "" { + p.Precision = bp.Precision + } + p.Timestamp = client.Timestamp(client.SetPrecision(p.Timestamp.Time(), p.Precision)) + if len(bp.Tags) > 0 { + if p.Tags == nil { + p.Tags = make(map[string]string) + } + for k := range bp.Tags { + if p.Tags[k] == "" { + p.Tags[k] = bp.Tags[k] + } + } + } + // Need to convert from a client.Point to a influxdb.Point + points = append(points, Point{ + Name: p.Name, + Tags: p.Tags, + Timestamp: p.Timestamp.Time(), + Fields: p.Fields, + }) + } + + return points, nil +} + // WriteSeries writes series data to the database. // Returns the messaging index the data was written to. func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) { diff --git a/udp/udp.go b/udp/udp.go index 8ad8f66cef..5b1cde72d7 100644 --- a/udp/udp.go +++ b/udp/udp.go @@ -3,9 +3,11 @@ package udp import ( "bytes" "encoding/json" - "github.com/influxdb/influxdb" "log" "net" + + "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/client" ) const ( @@ -44,7 +46,7 @@ func (u *UDPServer) ListenAndServe(iface string) error { return err } - var bp influxdb.BatchPoints + var bp client.BatchPoints buf := make([]byte, udpBufferSize) go func() {