From 4f5ad7399e13d3719b00cea978fc79d74184b511 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Fri, 6 Mar 2015 11:20:30 -0700 Subject: [PATCH] add tests for NormalizeBatchPoints. Remove custom type client.Timestamp --- client/influxdb.go | 158 ++++++++++++++++++++++------------------ client/influxdb_test.go | 8 +- influxdb.go | 42 +++++++++++ influxdb_test.go | 74 +++++++++++++++++++ server.go | 40 ---------- 5 files changed, 208 insertions(+), 114 deletions(-) diff --git a/client/influxdb.go b/client/influxdb.go index 08ec39e08c..ff92c2d101 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -43,73 +43,6 @@ func NewClient(c Config) (*Client, error) { return &client, nil } -// BatchPoints is used to send batched data in a single write. -type BatchPoints struct { - Points []Point `json:"points"` - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` - Tags map[string]string `json:"tags"` - Timestamp time.Time `json:"timestamp"` - Precision string `json:"precision"` -} - -// UnmarshalJSON decodes the data into the BatchPoints struct -func (bp *BatchPoints) UnmarshalJSON(b []byte) error { - var normal struct { - Points []Point `json:"points"` - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` - Tags map[string]string `json:"tags"` - Timestamp time.Time `json:"timestamp"` - Precision string `json:"precision"` - } - var epoch struct { - Points []Point `json:"points"` - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` - Tags map[string]string `json:"tags"` - Timestamp *int64 `json:"timestamp"` - Precision string `json:"precision"` - } - - if err := func() error { - var err error - if err = json.Unmarshal(b, &epoch); err != nil { - return err - } - // Convert from epoch to time.Time - var ts time.Time - if epoch.Timestamp != nil { - ts, err = EpochToTime(*epoch.Timestamp, epoch.Precision) - if err != nil { - return err - } - } - bp.Points = epoch.Points - bp.Database = epoch.Database - bp.RetentionPolicy = epoch.RetentionPolicy - bp.Tags = epoch.Tags - bp.Timestamp = ts - bp.Precision = epoch.Precision - return nil - }(); err == nil { - return nil - } - - if err := json.Unmarshal(b, &normal); err != nil { - return err - } - normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision) - bp.Points = normal.Points - bp.Database = normal.Database - bp.RetentionPolicy = normal.RetentionPolicy - bp.Tags = normal.Tags - bp.Timestamp = normal.Timestamp - bp.Precision = normal.Precision - - return nil -} - func (c *Client) Query(q Query) (*Results, error) { u := c.url @@ -318,11 +251,29 @@ func (t Timestamp) MarshalJSON() ([]byte, error) { type Point struct { Name string `json:"name"` Tags map[string]string `json:"tags"` - Timestamp Timestamp `json:"timestamp"` + Timestamp time.Time `json:"timestamp"` Fields map[string]interface{} `json:"fields"` Precision string `json:"precision"` } +// MarshalJSON will format the time in RFC3339Nano +func (p *Point) MarshalJSON() ([]byte, error) { + point := struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Timestamp string `json:"timestamp"` + Fields map[string]interface{} `json:"fields"` + Precision string `json:"precision"` + }{ + Name: p.Name, + Tags: p.Tags, + Timestamp: p.Timestamp.UTC().Format(time.RFC3339Nano), + Fields: p.Fields, + Precision: p.Precision, + } + return json.Marshal(&point) +} + // UnmarshalJSON decodes the data into the Point struct func (p *Point) UnmarshalJSON(b []byte) error { var normal struct { @@ -358,7 +309,7 @@ func (p *Point) UnmarshalJSON(b []byte) error { } p.Name = epoch.Name p.Tags = epoch.Tags - p.Timestamp = Timestamp(ts) + p.Timestamp = ts p.Precision = epoch.Precision p.Fields = normalizeFields(epoch.Fields) return nil @@ -374,7 +325,7 @@ func (p *Point) UnmarshalJSON(b []byte) error { normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision) p.Name = normal.Name p.Tags = normal.Tags - p.Timestamp = Timestamp(normal.Timestamp) + p.Timestamp = normal.Timestamp p.Precision = normal.Precision p.Fields = normalizeFields(normal.Fields) @@ -400,6 +351,73 @@ func normalizeFields(fields map[string]interface{}) map[string]interface{} { return newFields } +// BatchPoints is used to send batched data in a single write. +type BatchPoints struct { + Points []Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp time.Time `json:"timestamp"` + Precision string `json:"precision"` +} + +// UnmarshalJSON decodes the data into the BatchPoints struct +func (bp *BatchPoints) UnmarshalJSON(b []byte) error { + var normal struct { + Points []Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp time.Time `json:"timestamp"` + Precision string `json:"precision"` + } + var epoch struct { + Points []Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp *int64 `json:"timestamp"` + Precision string `json:"precision"` + } + + if err := func() error { + var err error + if err = json.Unmarshal(b, &epoch); err != nil { + return err + } + // Convert from epoch to time.Time + var ts time.Time + if epoch.Timestamp != nil { + ts, err = EpochToTime(*epoch.Timestamp, epoch.Precision) + if err != nil { + return err + } + } + bp.Points = epoch.Points + bp.Database = epoch.Database + bp.RetentionPolicy = epoch.RetentionPolicy + bp.Tags = epoch.Tags + bp.Timestamp = ts + bp.Precision = epoch.Precision + return nil + }(); err == nil { + return nil + } + + if err := json.Unmarshal(b, &normal); err != nil { + return err + } + normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision) + bp.Points = normal.Points + bp.Database = normal.Database + bp.RetentionPolicy = normal.RetentionPolicy + bp.Tags = normal.Tags + bp.Timestamp = normal.Timestamp + bp.Precision = normal.Precision + + return nil +} + // utility functions func (c *Client) Addr() string { diff --git a/client/influxdb_test.go b/client/influxdb_test.go index 9430891b0c..7fd0eb110c 100644 --- a/client/influxdb_test.go +++ b/client/influxdb_test.go @@ -259,8 +259,8 @@ func TestPoint_UnmarshalEpoch(t *testing.T) { if err != nil { t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) } - if !p.Timestamp.Time().Equal(test.expected) { - t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp.Time()) + if !p.Timestamp.Equal(test.expected) { + t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp) } } } @@ -297,8 +297,8 @@ func TestPoint_UnmarshalRFC(t *testing.T) { if err != nil { t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) } - if !p.Timestamp.Time().Equal(test.expected) { - t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp.Time()) + if !p.Timestamp.Equal(test.expected) { + t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp) } } } diff --git a/influxdb.go b/influxdb.go index 918b8a547e..07c9b5768b 100644 --- a/influxdb.go +++ b/influxdb.go @@ -5,6 +5,9 @@ import ( "errors" "fmt" "os" + "time" + + "github.com/influxdb/influxdb/client" ) var ( @@ -177,3 +180,42 @@ func assert(condition bool, msg string, v ...interface{}) { func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } + +// NormalizeBatchPoints returns a slice of Points, created by populating individual +// points within the batch, which do not have timestamps or tags, with the top-level +// values. +func NormalizeBatchPoints(bp client.BatchPoints) ([]Point, error) { + points := []Point{} + for _, p := range bp.Points { + if p.Timestamp.IsZero() { + if bp.Timestamp.IsZero() { + p.Timestamp = time.Now() + } else { + p.Timestamp = bp.Timestamp + } + } + if p.Precision == "" && bp.Precision != "" { + p.Precision = bp.Precision + } + p.Timestamp = client.SetPrecision(p.Timestamp, p.Precision) + if len(bp.Tags) > 0 { + if p.Tags == nil { + p.Tags = make(map[string]string) + } + for k := range bp.Tags { + if p.Tags[k] == "" { + p.Tags[k] = bp.Tags[k] + } + } + } + // Need to convert from a client.Point to a influxdb.Point + points = append(points, Point{ + Name: p.Name, + Tags: p.Tags, + Timestamp: p.Timestamp, + Fields: p.Fields, + }) + } + + return points, nil +} diff --git a/influxdb_test.go b/influxdb_test.go index daba8c9362..bcd7afcdf1 100644 --- a/influxdb_test.go +++ b/influxdb_test.go @@ -1 +1,75 @@ package influxdb_test + +import ( + "reflect" + "testing" + "time" + + "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/client" +) + +func TestNormalizeBatchPoints(t *testing.T) { + now := time.Now() + tests := []struct { + name string + bp client.BatchPoints + p []influxdb.Point + err string + }{ + { + name: "default", + bp: client.BatchPoints{ + Points: []client.Point{ + {Name: "cpu", Tags: map[string]string{"region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}}, + }, + }, + p: []influxdb.Point{ + {Name: "cpu", Tags: map[string]string{"region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}}, + }, + }, + { + name: "merge timestamp", + bp: client.BatchPoints{ + Timestamp: now, + Points: []client.Point{ + {Name: "cpu", Tags: map[string]string{"region": "useast"}, Fields: map[string]interface{}{"value": 1.0}}, + }, + }, + p: []influxdb.Point{ + {Name: "cpu", Tags: map[string]string{"region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}}, + }, + }, + { + name: "merge tags", + bp: client.BatchPoints{ + Tags: map[string]string{"day": "monday"}, + Points: []client.Point{ + {Name: "cpu", Tags: map[string]string{"region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}}, + {Name: "memory", Timestamp: now, Fields: map[string]interface{}{"value": 2.0}}, + }, + }, + p: []influxdb.Point{ + {Name: "cpu", Tags: map[string]string{"day": "monday", "region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}}, + {Name: "memory", Tags: map[string]string{"day": "monday"}, Timestamp: now, Fields: map[string]interface{}{"value": 2.0}}, + }, + }, + } + + for _, test := range tests { + t.Logf("running test %q", test.name) + p, e := influxdb.NormalizeBatchPoints(test.bp) + if test.err == "" && e != nil { + t.Error("unexpected error %s", e) + } else if test.err != "" && e == nil { + t.Error("expected error %s, got ", test.err) + } else if e != nil && test.err != e.Error() { + t.Error("unexpected error. expected: %s, got %s", test.err, e) + } + if !reflect.DeepEqual(p, test.p) { + t.Logf("expected: %+v", test.p) + t.Logf("got: %+v", p) + t.Error("failed to normalize.") + } + } +} diff --git a/server.go b/server.go index f458f55fc9..5e33d776fb 100644 --- a/server.go +++ b/server.go @@ -19,7 +19,6 @@ import ( "sync" "time" - "github.com/influxdb/influxdb/client" "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/messaging" "golang.org/x/crypto/bcrypt" @@ -1368,45 +1367,6 @@ type Point struct { Fields map[string]interface{} } -// NormalizeBatchPoints returns a slice of Points, created by populating individual -// points within the batch, which do not have timestamps or tags, with the top-level -// values. -func NormalizeBatchPoints(bp client.BatchPoints) ([]Point, error) { - points := []Point{} - for _, p := range bp.Points { - if p.Timestamp.Time().IsZero() { - if bp.Timestamp.IsZero() { - p.Timestamp = client.Timestamp(time.Now()) - } else { - p.Timestamp = client.Timestamp(bp.Timestamp) - } - } - if p.Precision == "" && bp.Precision != "" { - p.Precision = bp.Precision - } - p.Timestamp = client.Timestamp(client.SetPrecision(p.Timestamp.Time(), p.Precision)) - if len(bp.Tags) > 0 { - if p.Tags == nil { - p.Tags = make(map[string]string) - } - for k := range bp.Tags { - if p.Tags[k] == "" { - p.Tags[k] = bp.Tags[k] - } - } - } - // Need to convert from a client.Point to a influxdb.Point - points = append(points, Point{ - Name: p.Name, - Tags: p.Tags, - Timestamp: p.Timestamp.Time(), - Fields: p.Fields, - }) - } - - return points, nil -} - // WriteSeries writes series data to the database. // Returns the messaging index the data was written to. func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) {