add tests for NormalizeBatchPoints. Remove custom type client.Timestamp

pull/1868/head
Cory LaNou 2015-03-06 11:20:30 -07:00
parent bda9685526
commit 4f5ad7399e
5 changed files with 208 additions and 114 deletions

View File

@ -43,73 +43,6 @@ func NewClient(c Config) (*Client, error) {
return &client, nil return &client, nil
} }
// BatchPoints is used to send batched data in a single write.
type BatchPoints struct {
Points []Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
Precision string `json:"precision"`
}
// UnmarshalJSON decodes the data into the BatchPoints struct
func (bp *BatchPoints) UnmarshalJSON(b []byte) error {
var normal struct {
Points []Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
Precision string `json:"precision"`
}
var epoch struct {
Points []Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp *int64 `json:"timestamp"`
Precision string `json:"precision"`
}
if err := func() error {
var err error
if err = json.Unmarshal(b, &epoch); err != nil {
return err
}
// Convert from epoch to time.Time
var ts time.Time
if epoch.Timestamp != nil {
ts, err = EpochToTime(*epoch.Timestamp, epoch.Precision)
if err != nil {
return err
}
}
bp.Points = epoch.Points
bp.Database = epoch.Database
bp.RetentionPolicy = epoch.RetentionPolicy
bp.Tags = epoch.Tags
bp.Timestamp = ts
bp.Precision = epoch.Precision
return nil
}(); err == nil {
return nil
}
if err := json.Unmarshal(b, &normal); err != nil {
return err
}
normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision)
bp.Points = normal.Points
bp.Database = normal.Database
bp.RetentionPolicy = normal.RetentionPolicy
bp.Tags = normal.Tags
bp.Timestamp = normal.Timestamp
bp.Precision = normal.Precision
return nil
}
func (c *Client) Query(q Query) (*Results, error) { func (c *Client) Query(q Query) (*Results, error) {
u := c.url u := c.url
@ -318,11 +251,29 @@ func (t Timestamp) MarshalJSON() ([]byte, error) {
type Point struct { type Point struct {
Name string `json:"name"` Name string `json:"name"`
Tags map[string]string `json:"tags"` Tags map[string]string `json:"tags"`
Timestamp Timestamp `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
Fields map[string]interface{} `json:"fields"` Fields map[string]interface{} `json:"fields"`
Precision string `json:"precision"` Precision string `json:"precision"`
} }
// MarshalJSON will format the time in RFC3339Nano
func (p *Point) MarshalJSON() ([]byte, error) {
point := struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Timestamp string `json:"timestamp"`
Fields map[string]interface{} `json:"fields"`
Precision string `json:"precision"`
}{
Name: p.Name,
Tags: p.Tags,
Timestamp: p.Timestamp.UTC().Format(time.RFC3339Nano),
Fields: p.Fields,
Precision: p.Precision,
}
return json.Marshal(&point)
}
// UnmarshalJSON decodes the data into the Point struct // UnmarshalJSON decodes the data into the Point struct
func (p *Point) UnmarshalJSON(b []byte) error { func (p *Point) UnmarshalJSON(b []byte) error {
var normal struct { var normal struct {
@ -358,7 +309,7 @@ func (p *Point) UnmarshalJSON(b []byte) error {
} }
p.Name = epoch.Name p.Name = epoch.Name
p.Tags = epoch.Tags p.Tags = epoch.Tags
p.Timestamp = Timestamp(ts) p.Timestamp = ts
p.Precision = epoch.Precision p.Precision = epoch.Precision
p.Fields = normalizeFields(epoch.Fields) p.Fields = normalizeFields(epoch.Fields)
return nil return nil
@ -374,7 +325,7 @@ func (p *Point) UnmarshalJSON(b []byte) error {
normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision) normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision)
p.Name = normal.Name p.Name = normal.Name
p.Tags = normal.Tags p.Tags = normal.Tags
p.Timestamp = Timestamp(normal.Timestamp) p.Timestamp = normal.Timestamp
p.Precision = normal.Precision p.Precision = normal.Precision
p.Fields = normalizeFields(normal.Fields) p.Fields = normalizeFields(normal.Fields)
@ -400,6 +351,73 @@ func normalizeFields(fields map[string]interface{}) map[string]interface{} {
return newFields return newFields
} }
// BatchPoints is used to send batched data in a single write.
type BatchPoints struct {
Points []Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
Precision string `json:"precision"`
}
// UnmarshalJSON decodes the data into the BatchPoints struct
func (bp *BatchPoints) UnmarshalJSON(b []byte) error {
var normal struct {
Points []Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
Precision string `json:"precision"`
}
var epoch struct {
Points []Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp *int64 `json:"timestamp"`
Precision string `json:"precision"`
}
if err := func() error {
var err error
if err = json.Unmarshal(b, &epoch); err != nil {
return err
}
// Convert from epoch to time.Time
var ts time.Time
if epoch.Timestamp != nil {
ts, err = EpochToTime(*epoch.Timestamp, epoch.Precision)
if err != nil {
return err
}
}
bp.Points = epoch.Points
bp.Database = epoch.Database
bp.RetentionPolicy = epoch.RetentionPolicy
bp.Tags = epoch.Tags
bp.Timestamp = ts
bp.Precision = epoch.Precision
return nil
}(); err == nil {
return nil
}
if err := json.Unmarshal(b, &normal); err != nil {
return err
}
normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision)
bp.Points = normal.Points
bp.Database = normal.Database
bp.RetentionPolicy = normal.RetentionPolicy
bp.Tags = normal.Tags
bp.Timestamp = normal.Timestamp
bp.Precision = normal.Precision
return nil
}
// utility functions // utility functions
func (c *Client) Addr() string { func (c *Client) Addr() string {

View File

@ -259,8 +259,8 @@ func TestPoint_UnmarshalEpoch(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
} }
if !p.Timestamp.Time().Equal(test.expected) { if !p.Timestamp.Equal(test.expected) {
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp.Time()) t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp)
} }
} }
} }
@ -297,8 +297,8 @@ func TestPoint_UnmarshalRFC(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
} }
if !p.Timestamp.Time().Equal(test.expected) { if !p.Timestamp.Equal(test.expected) {
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp.Time()) t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp)
} }
} }
} }

View File

@ -5,6 +5,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"os" "os"
"time"
"github.com/influxdb/influxdb/client"
) )
var ( var (
@ -177,3 +180,42 @@ func assert(condition bool, msg string, v ...interface{}) {
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
// NormalizeBatchPoints returns a slice of Points, created by populating individual
// points within the batch, which do not have timestamps or tags, with the top-level
// values.
func NormalizeBatchPoints(bp client.BatchPoints) ([]Point, error) {
points := []Point{}
for _, p := range bp.Points {
if p.Timestamp.IsZero() {
if bp.Timestamp.IsZero() {
p.Timestamp = time.Now()
} else {
p.Timestamp = bp.Timestamp
}
}
if p.Precision == "" && bp.Precision != "" {
p.Precision = bp.Precision
}
p.Timestamp = client.SetPrecision(p.Timestamp, p.Precision)
if len(bp.Tags) > 0 {
if p.Tags == nil {
p.Tags = make(map[string]string)
}
for k := range bp.Tags {
if p.Tags[k] == "" {
p.Tags[k] = bp.Tags[k]
}
}
}
// Need to convert from a client.Point to a influxdb.Point
points = append(points, Point{
Name: p.Name,
Tags: p.Tags,
Timestamp: p.Timestamp,
Fields: p.Fields,
})
}
return points, nil
}

View File

@ -1 +1,75 @@
package influxdb_test package influxdb_test
import (
"reflect"
"testing"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/client"
)
func TestNormalizeBatchPoints(t *testing.T) {
now := time.Now()
tests := []struct {
name string
bp client.BatchPoints
p []influxdb.Point
err string
}{
{
name: "default",
bp: client.BatchPoints{
Points: []client.Point{
{Name: "cpu", Tags: map[string]string{"region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}},
},
},
p: []influxdb.Point{
{Name: "cpu", Tags: map[string]string{"region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}},
},
},
{
name: "merge timestamp",
bp: client.BatchPoints{
Timestamp: now,
Points: []client.Point{
{Name: "cpu", Tags: map[string]string{"region": "useast"}, Fields: map[string]interface{}{"value": 1.0}},
},
},
p: []influxdb.Point{
{Name: "cpu", Tags: map[string]string{"region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}},
},
},
{
name: "merge tags",
bp: client.BatchPoints{
Tags: map[string]string{"day": "monday"},
Points: []client.Point{
{Name: "cpu", Tags: map[string]string{"region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}},
{Name: "memory", Timestamp: now, Fields: map[string]interface{}{"value": 2.0}},
},
},
p: []influxdb.Point{
{Name: "cpu", Tags: map[string]string{"day": "monday", "region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}},
{Name: "memory", Tags: map[string]string{"day": "monday"}, Timestamp: now, Fields: map[string]interface{}{"value": 2.0}},
},
},
}
for _, test := range tests {
t.Logf("running test %q", test.name)
p, e := influxdb.NormalizeBatchPoints(test.bp)
if test.err == "" && e != nil {
t.Error("unexpected error %s", e)
} else if test.err != "" && e == nil {
t.Error("expected error %s, got <nil>", test.err)
} else if e != nil && test.err != e.Error() {
t.Error("unexpected error. expected: %s, got %s", test.err, e)
}
if !reflect.DeepEqual(p, test.p) {
t.Logf("expected: %+v", test.p)
t.Logf("got: %+v", p)
t.Error("failed to normalize.")
}
}
}

View File

@ -19,7 +19,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/messaging" "github.com/influxdb/influxdb/messaging"
"golang.org/x/crypto/bcrypt" "golang.org/x/crypto/bcrypt"
@ -1368,45 +1367,6 @@ type Point struct {
Fields map[string]interface{} Fields map[string]interface{}
} }
// NormalizeBatchPoints returns a slice of Points, created by populating individual
// points within the batch, which do not have timestamps or tags, with the top-level
// values.
func NormalizeBatchPoints(bp client.BatchPoints) ([]Point, error) {
points := []Point{}
for _, p := range bp.Points {
if p.Timestamp.Time().IsZero() {
if bp.Timestamp.IsZero() {
p.Timestamp = client.Timestamp(time.Now())
} else {
p.Timestamp = client.Timestamp(bp.Timestamp)
}
}
if p.Precision == "" && bp.Precision != "" {
p.Precision = bp.Precision
}
p.Timestamp = client.Timestamp(client.SetPrecision(p.Timestamp.Time(), p.Precision))
if len(bp.Tags) > 0 {
if p.Tags == nil {
p.Tags = make(map[string]string)
}
for k := range bp.Tags {
if p.Tags[k] == "" {
p.Tags[k] = bp.Tags[k]
}
}
}
// Need to convert from a client.Point to a influxdb.Point
points = append(points, Point{
Name: p.Name,
Tags: p.Tags,
Timestamp: p.Timestamp.Time(),
Fields: p.Fields,
})
}
return points, nil
}
// WriteSeries writes series data to the database. // WriteSeries writes series data to the database.
// Returns the messaging index the data was written to. // Returns the messaging index the data was written to.
func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) { func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) {