Merge pull request #1868 from influxdb/update-client-write
Use BatchPoints for writing from client librarypull/1889/head
commit
71ec2bbf2b
|
@ -6,6 +6,7 @@
|
|||
- [#1753](https://github.com/influxdb/influxdb/pull/1874): Do Not Panic on Missing Dirs
|
||||
- [#1877](https://github.com/influxdb/influxdb/pull/1877): Broker clients track broker leader
|
||||
- [#1862](https://github.com/influxdb/influxdb/pull/1862): Fix memory leak in `httpd.serveWait`. Thanks @mountkin
|
||||
- [#1868](https://github.com/influxdb/influxdb/pull/1868): Use `BatchPoints` for `client.Write` method. Thanks @vladlopes, @georgmu, @d2g, @evanphx, @akolosov.
|
||||
|
||||
## v0.9.0-rc9 [2015-03-06]
|
||||
|
||||
|
|
|
@ -12,6 +12,11 @@ import (
|
|||
"github.com/influxdb/influxdb/influxql"
|
||||
)
|
||||
|
||||
type Query struct {
|
||||
Command string
|
||||
Database string
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
URL url.URL
|
||||
Username string
|
||||
|
@ -27,17 +32,6 @@ type Client struct {
|
|||
userAgent string
|
||||
}
|
||||
|
||||
type Query struct {
|
||||
Command string
|
||||
Database string
|
||||
}
|
||||
|
||||
type Write struct {
|
||||
Database string
|
||||
RetentionPolicy string
|
||||
Points []Point
|
||||
}
|
||||
|
||||
func NewClient(c Config) (*Client, error) {
|
||||
client := Client{
|
||||
url: c.URL,
|
||||
|
@ -81,22 +75,14 @@ func (c *Client) Query(q Query) (*Results, error) {
|
|||
return &results, nil
|
||||
}
|
||||
|
||||
func (c *Client) Write(writes ...Write) (*Results, error) {
|
||||
func (c *Client) Write(bp BatchPoints) (*Results, error) {
|
||||
c.url.Path = "write"
|
||||
type data struct {
|
||||
Points []Point `json:"points"`
|
||||
Database string `json:"database"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
}
|
||||
|
||||
d := []data{}
|
||||
for _, write := range writes {
|
||||
d = append(d, data{Points: write.Points, Database: write.Database, RetentionPolicy: write.RetentionPolicy})
|
||||
b, err := json.Marshal(&bp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b := []byte{}
|
||||
err := json.Unmarshal(b, &d)
|
||||
|
||||
req, err := http.NewRequest("POST", c.url.String(), bytes.NewBuffer(b))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -115,10 +101,14 @@ func (c *Client) Write(writes ...Write) (*Results, error) {
|
|||
dec := json.NewDecoder(resp.Body)
|
||||
dec.UseNumber()
|
||||
err = dec.Decode(&results)
|
||||
|
||||
if err != nil {
|
||||
if err != nil && err.Error() != "EOF" {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return &results, results.Error()
|
||||
}
|
||||
|
||||
return &results, nil
|
||||
}
|
||||
|
||||
|
@ -243,28 +233,34 @@ func (a Results) Error() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Timestamp is a custom type so we marshal JSON properly into UTC nanosecond time
|
||||
type Timestamp time.Time
|
||||
|
||||
// Time returns the time represented by the Timestamp
|
||||
func (t Timestamp) Time() time.Time {
|
||||
return time.Time(t)
|
||||
}
|
||||
|
||||
// MarshalJSON returns time in UTC with nanoseconds
|
||||
func (t Timestamp) MarshalJSON() ([]byte, error) {
|
||||
// Always send back in UTC with nanoseconds
|
||||
s := t.Time().UTC().Format(time.RFC3339Nano)
|
||||
return []byte(`"` + s + `"`), nil
|
||||
}
|
||||
|
||||
// Point defines the fields that will be written to the database
|
||||
type Point struct {
|
||||
Name string `json:"name"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Timestamp Timestamp `json:"timestamp"`
|
||||
Fields map[string]interface{} `json:"fields"`
|
||||
Precision string `json:"precision"`
|
||||
Name string
|
||||
Tags map[string]string
|
||||
Timestamp time.Time
|
||||
Fields map[string]interface{}
|
||||
Precision string
|
||||
}
|
||||
|
||||
// MarshalJSON will format the time in RFC3339Nano
|
||||
// Precision is also ignored as it is only used for writing, not reading
|
||||
// Or another way to say it is we always send back in nanosecond precision
|
||||
func (p *Point) MarshalJSON() ([]byte, error) {
|
||||
point := struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Tags map[string]string `json:"tags,omitempty"`
|
||||
Timestamp string `json:"timestamp,omitempty"`
|
||||
Fields map[string]interface{} `json:"fields,omitempty"`
|
||||
}{
|
||||
Name: p.Name,
|
||||
Tags: p.Tags,
|
||||
Fields: p.Fields,
|
||||
}
|
||||
// Let it omit empty if it's really zero
|
||||
if !p.Timestamp.IsZero() {
|
||||
point.Timestamp = p.Timestamp.UTC().Format(time.RFC3339Nano)
|
||||
}
|
||||
return json.Marshal(&point)
|
||||
}
|
||||
|
||||
// UnmarshalJSON decodes the data into the Point struct
|
||||
|
@ -302,7 +298,7 @@ func (p *Point) UnmarshalJSON(b []byte) error {
|
|||
}
|
||||
p.Name = epoch.Name
|
||||
p.Tags = epoch.Tags
|
||||
p.Timestamp = Timestamp(ts)
|
||||
p.Timestamp = ts
|
||||
p.Precision = epoch.Precision
|
||||
p.Fields = normalizeFields(epoch.Fields)
|
||||
return nil
|
||||
|
@ -318,7 +314,7 @@ func (p *Point) UnmarshalJSON(b []byte) error {
|
|||
normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision)
|
||||
p.Name = normal.Name
|
||||
p.Tags = normal.Tags
|
||||
p.Timestamp = Timestamp(normal.Timestamp)
|
||||
p.Timestamp = normal.Timestamp
|
||||
p.Precision = normal.Precision
|
||||
p.Fields = normalizeFields(normal.Fields)
|
||||
|
||||
|
@ -344,6 +340,73 @@ func normalizeFields(fields map[string]interface{}) map[string]interface{} {
|
|||
return newFields
|
||||
}
|
||||
|
||||
// BatchPoints is used to send batched data in a single write.
|
||||
type BatchPoints struct {
|
||||
Points []Point `json:"points,omitempty"`
|
||||
Database string `json:"database,omitempty"`
|
||||
RetentionPolicy string `json:"retentionPolicy,omitempty"`
|
||||
Tags map[string]string `json:"tags,omitempty"`
|
||||
Timestamp time.Time `json:"timestamp,omitempty"`
|
||||
Precision string `json:"precision,omitempty"`
|
||||
}
|
||||
|
||||
// UnmarshalJSON decodes the data into the BatchPoints struct
|
||||
func (bp *BatchPoints) UnmarshalJSON(b []byte) error {
|
||||
var normal struct {
|
||||
Points []Point `json:"points"`
|
||||
Database string `json:"database"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Precision string `json:"precision"`
|
||||
}
|
||||
var epoch struct {
|
||||
Points []Point `json:"points"`
|
||||
Database string `json:"database"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Timestamp *int64 `json:"timestamp"`
|
||||
Precision string `json:"precision"`
|
||||
}
|
||||
|
||||
if err := func() error {
|
||||
var err error
|
||||
if err = json.Unmarshal(b, &epoch); err != nil {
|
||||
return err
|
||||
}
|
||||
// Convert from epoch to time.Time
|
||||
var ts time.Time
|
||||
if epoch.Timestamp != nil {
|
||||
ts, err = EpochToTime(*epoch.Timestamp, epoch.Precision)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
bp.Points = epoch.Points
|
||||
bp.Database = epoch.Database
|
||||
bp.RetentionPolicy = epoch.RetentionPolicy
|
||||
bp.Tags = epoch.Tags
|
||||
bp.Timestamp = ts
|
||||
bp.Precision = epoch.Precision
|
||||
return nil
|
||||
}(); err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(b, &normal); err != nil {
|
||||
return err
|
||||
}
|
||||
normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision)
|
||||
bp.Points = normal.Points
|
||||
bp.Database = normal.Database
|
||||
bp.RetentionPolicy = normal.RetentionPolicy
|
||||
bp.Tags = normal.Tags
|
||||
bp.Timestamp = normal.Timestamp
|
||||
bp.Precision = normal.Precision
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// utility functions
|
||||
|
||||
func (c *Client) Addr() string {
|
||||
|
|
|
@ -111,8 +111,8 @@ func TestClient_Write(t *testing.T) {
|
|||
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
|
||||
}
|
||||
|
||||
write := client.Write{}
|
||||
_, err = c.Write(write)
|
||||
bp := client.BatchPoints{}
|
||||
_, err = c.Write(bp)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
|
||||
}
|
||||
|
@ -172,8 +172,8 @@ func TestClient_UserAgent(t *testing.T) {
|
|||
}
|
||||
|
||||
receivedUserAgent = ""
|
||||
write := client.Write{}
|
||||
_, err = c.Write(write)
|
||||
bp := client.BatchPoints{}
|
||||
_, err = c.Write(bp)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
|
||||
}
|
||||
|
@ -259,8 +259,8 @@ func TestPoint_UnmarshalEpoch(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
|
||||
}
|
||||
if !p.Timestamp.Time().Equal(test.expected) {
|
||||
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp.Time())
|
||||
if !p.Timestamp.Equal(test.expected) {
|
||||
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -297,8 +297,54 @@ func TestPoint_UnmarshalRFC(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
|
||||
}
|
||||
if !p.Timestamp.Time().Equal(test.expected) {
|
||||
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp.Time())
|
||||
if !p.Timestamp.Equal(test.expected) {
|
||||
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, p.Timestamp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPoint_MarshalOmitempty(t *testing.T) {
|
||||
now := time.Now().UTC()
|
||||
tests := []struct {
|
||||
name string
|
||||
point client.Point
|
||||
now time.Time
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "all empty",
|
||||
point: client.Point{Name: "cpu", Fields: map[string]interface{}{"value": 1.1}},
|
||||
now: now,
|
||||
expected: `{"name":"cpu","fields":{"value":1.1}}`,
|
||||
},
|
||||
{
|
||||
name: "with time",
|
||||
point: client.Point{Name: "cpu", Fields: map[string]interface{}{"value": 1.1}, Timestamp: now},
|
||||
now: now,
|
||||
expected: fmt.Sprintf(`{"name":"cpu","timestamp":"%s","fields":{"value":1.1}}`, now.Format(time.RFC3339Nano)),
|
||||
},
|
||||
{
|
||||
name: "with tags",
|
||||
point: client.Point{Name: "cpu", Fields: map[string]interface{}{"value": 1.1}, Tags: map[string]string{"foo": "bar"}},
|
||||
now: now,
|
||||
expected: `{"name":"cpu","tags":{"foo":"bar"},"fields":{"value":1.1}}`,
|
||||
},
|
||||
{
|
||||
name: "with precision",
|
||||
point: client.Point{Name: "cpu", Fields: map[string]interface{}{"value": 1.1}, Precision: "ms"},
|
||||
now: now,
|
||||
expected: `{"name":"cpu","fields":{"value":1.1}}`,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Logf("testing %q\n", test.name)
|
||||
b, err := json.Marshal(&test.point)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
|
||||
}
|
||||
if test.expected != string(b) {
|
||||
t.Fatalf("Unexpected result. expected: %v, actual: %v", test.expected, string(b))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -340,3 +386,42 @@ func emptyTestServer() *httptest.Server {
|
|||
return
|
||||
}))
|
||||
}
|
||||
|
||||
// Ensure that data with epoch timestamps can be decoded.
|
||||
func TestBatchPoints_Normal(t *testing.T) {
|
||||
var bp client.BatchPoints
|
||||
data := []byte(`
|
||||
{
|
||||
"database": "foo",
|
||||
"retentionPolicy": "bar",
|
||||
"points": [
|
||||
{
|
||||
"name": "cpu",
|
||||
"tags": {
|
||||
"host": "server01"
|
||||
},
|
||||
"timestamp": 14244733039069373,
|
||||
"precision": "n",
|
||||
"values": {
|
||||
"value": 4541770385657154000
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "cpu",
|
||||
"tags": {
|
||||
"host": "server01"
|
||||
},
|
||||
"timestamp": 14244733039069380,
|
||||
"precision": "n",
|
||||
"values": {
|
||||
"value": 7199311900554737000
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
`)
|
||||
|
||||
if err := json.Unmarshal(data, &bp); err != nil {
|
||||
t.Errorf("failed to unmarshal nanosecond data: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package main_test
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
@ -16,6 +17,7 @@ import (
|
|||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/messaging"
|
||||
|
||||
"github.com/influxdb/influxdb/client"
|
||||
main "github.com/influxdb/influxdb/cmd/influxd"
|
||||
)
|
||||
|
||||
|
@ -654,3 +656,105 @@ func Test3NodeServer(t *testing.T) {
|
|||
|
||||
runTestsData(t, testName, nodes, "mydb", "myrp")
|
||||
}
|
||||
|
||||
func TestClientLibrary(t *testing.T) {
|
||||
testName := "single server integration via client library"
|
||||
if testing.Short() {
|
||||
t.Skip(fmt.Sprintf("skipping '%s'", testName))
|
||||
}
|
||||
dir := tempfile()
|
||||
defer func() {
|
||||
os.RemoveAll(dir)
|
||||
}()
|
||||
|
||||
database := "mydb"
|
||||
retentionPolicy := "myrp"
|
||||
now := time.Now().UTC()
|
||||
|
||||
nodes := createCombinedNodeCluster(t, testName, dir, 1, 8290)
|
||||
createDatabase(t, testName, nodes, database)
|
||||
createRetentionPolicy(t, testName, nodes, database, retentionPolicy)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
bp client.BatchPoints
|
||||
results client.Results
|
||||
query client.Query
|
||||
writeExpected, queryExpected string
|
||||
writeErr, queryErr string
|
||||
}{
|
||||
{
|
||||
name: "empty batchpoint",
|
||||
writeErr: "database is required",
|
||||
writeExpected: `{"error":"database is required"}`,
|
||||
},
|
||||
{
|
||||
name: "no points",
|
||||
writeExpected: `{}`,
|
||||
bp: client.BatchPoints{Database: "mydb"},
|
||||
},
|
||||
{
|
||||
name: "one point",
|
||||
bp: client.BatchPoints{
|
||||
Database: "mydb",
|
||||
Points: []client.Point{
|
||||
{Name: "cpu", Fields: map[string]interface{}{"value": 1.1}, Timestamp: now},
|
||||
},
|
||||
},
|
||||
writeExpected: `{}`,
|
||||
query: client.Query{Command: `select * from "mydb"."myrp".cpu`},
|
||||
queryExpected: fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",1.1]]}]}]}`, now.Format(time.RFC3339Nano)),
|
||||
},
|
||||
}
|
||||
|
||||
c, e := client.NewClient(client.Config{URL: *nodes[0].url})
|
||||
if e != nil {
|
||||
t.Fatalf("error creating client: %s", e)
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Logf("testing %s - %s\n", testName, test.name)
|
||||
writeResult, err := c.Write(test.bp)
|
||||
if test.writeErr != errToString(err) {
|
||||
t.Errorf("unexpected error. expected: %s, got %v", test.writeErr, err)
|
||||
}
|
||||
jsonResult := mustMarshalJSON(writeResult)
|
||||
if test.writeExpected != jsonResult {
|
||||
t.Logf("write expected result: %s\n", test.writeExpected)
|
||||
t.Logf("write got result: %s\n", jsonResult)
|
||||
t.Error("unexpected results")
|
||||
}
|
||||
|
||||
if test.query.Command != "" {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
queryResult, err := c.Query(test.query)
|
||||
if test.queryErr != errToString(err) {
|
||||
t.Errorf("unexpected error. expected: %s, got %v", test.queryErr, err)
|
||||
}
|
||||
jsonResult := mustMarshalJSON(queryResult)
|
||||
if test.queryExpected != jsonResult {
|
||||
t.Logf("query expected result: %s\n", test.queryExpected)
|
||||
t.Logf("query got result: %s\n", jsonResult)
|
||||
t.Error("unexpected results")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// helper funcs
|
||||
|
||||
func errToString(err error) string {
|
||||
if err != nil {
|
||||
return err.Error()
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func mustMarshalJSON(v interface{}) string {
|
||||
b, e := json.Marshal(v)
|
||||
if e != nil {
|
||||
panic(e)
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"github.com/bmizerany/pat"
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/client"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
)
|
||||
|
||||
|
@ -176,7 +177,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ
|
|||
|
||||
// serveWrite receives incoming series data and writes it to the database.
|
||||
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
|
||||
var bp influxdb.BatchPoints
|
||||
var bp client.BatchPoints
|
||||
var dec *json.Decoder
|
||||
|
||||
if h.WriteTrace {
|
||||
|
@ -189,6 +190,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influ
|
|||
dec = json.NewDecoder(strings.NewReader(string(b)))
|
||||
} else {
|
||||
dec = json.NewDecoder(r.Body)
|
||||
defer r.Body.Close()
|
||||
}
|
||||
|
||||
var writeError = func(result influxdb.Result, statusCode int) {
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/client"
|
||||
"github.com/influxdb/influxdb/httpd"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
"github.com/influxdb/influxdb/messaging"
|
||||
|
@ -87,7 +88,7 @@ func TestBatchWrite_UnmarshalEpoch(t *testing.T) {
|
|||
t.Logf("testing %q\n", test.name)
|
||||
data := []byte(fmt.Sprintf(`{"timestamp": %d, "precision":"%s"}`, test.epoch, test.precision))
|
||||
t.Logf("json: %s", string(data))
|
||||
var bp influxdb.BatchPoints
|
||||
var bp client.BatchPoints
|
||||
err := json.Unmarshal(data, &bp)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. expected: %v, actual: %v", nil, err)
|
||||
|
@ -125,7 +126,7 @@ func TestBatchWrite_UnmarshalRFC(t *testing.T) {
|
|||
ts := test.now.Format(test.rfc)
|
||||
data := []byte(fmt.Sprintf(`{"timestamp": %q}`, ts))
|
||||
t.Logf("json: %s", string(data))
|
||||
var bp influxdb.BatchPoints
|
||||
var bp client.BatchPoints
|
||||
err := json.Unmarshal(data, &bp)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
|
||||
|
|
145
influxdb.go
145
influxdb.go
|
@ -130,112 +130,6 @@ var (
|
|||
ErrContinuousQueryExists = errors.New("continuous query already exists")
|
||||
)
|
||||
|
||||
// BatchPoints is used to send batched data in a single write.
|
||||
type BatchPoints struct {
|
||||
Points []client.Point `json:"points"`
|
||||
Database string `json:"database"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Precision string `json:"precision"`
|
||||
}
|
||||
|
||||
// UnmarshalJSON decodes the data into the BatchPoints struct
|
||||
func (bp *BatchPoints) UnmarshalJSON(b []byte) error {
|
||||
var normal struct {
|
||||
Points []client.Point `json:"points"`
|
||||
Database string `json:"database"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Precision string `json:"precision"`
|
||||
}
|
||||
var epoch struct {
|
||||
Points []client.Point `json:"points"`
|
||||
Database string `json:"database"`
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Timestamp *int64 `json:"timestamp"`
|
||||
Precision string `json:"precision"`
|
||||
}
|
||||
|
||||
if err := func() error {
|
||||
var err error
|
||||
if err = json.Unmarshal(b, &epoch); err != nil {
|
||||
return err
|
||||
}
|
||||
// Convert from epoch to time.Time
|
||||
var ts time.Time
|
||||
if epoch.Timestamp != nil {
|
||||
ts, err = client.EpochToTime(*epoch.Timestamp, epoch.Precision)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
bp.Points = epoch.Points
|
||||
bp.Database = epoch.Database
|
||||
bp.RetentionPolicy = epoch.RetentionPolicy
|
||||
bp.Tags = epoch.Tags
|
||||
bp.Timestamp = ts
|
||||
bp.Precision = epoch.Precision
|
||||
return nil
|
||||
}(); err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(b, &normal); err != nil {
|
||||
return err
|
||||
}
|
||||
normal.Timestamp = client.SetPrecision(normal.Timestamp, normal.Precision)
|
||||
bp.Points = normal.Points
|
||||
bp.Database = normal.Database
|
||||
bp.RetentionPolicy = normal.RetentionPolicy
|
||||
bp.Tags = normal.Tags
|
||||
bp.Timestamp = normal.Timestamp
|
||||
bp.Precision = normal.Precision
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NormalizeBatchPoints returns a slice of Points, created by populating individual
|
||||
// points within the batch, which do not have timestamps or tags, with the top-level
|
||||
// values.
|
||||
func NormalizeBatchPoints(bp BatchPoints) ([]Point, error) {
|
||||
points := []Point{}
|
||||
for _, p := range bp.Points {
|
||||
if p.Timestamp.Time().IsZero() {
|
||||
if bp.Timestamp.IsZero() {
|
||||
p.Timestamp = client.Timestamp(time.Now())
|
||||
} else {
|
||||
p.Timestamp = client.Timestamp(bp.Timestamp)
|
||||
}
|
||||
}
|
||||
if p.Precision == "" && bp.Precision != "" {
|
||||
p.Precision = bp.Precision
|
||||
}
|
||||
p.Timestamp = client.Timestamp(client.SetPrecision(p.Timestamp.Time(), p.Precision))
|
||||
if len(bp.Tags) > 0 {
|
||||
if p.Tags == nil {
|
||||
p.Tags = make(map[string]string)
|
||||
}
|
||||
for k := range bp.Tags {
|
||||
if p.Tags[k] == "" {
|
||||
p.Tags[k] = bp.Tags[k]
|
||||
}
|
||||
}
|
||||
}
|
||||
// Need to convert from a client.Point to a influxdb.Point
|
||||
points = append(points, Point{
|
||||
Name: p.Name,
|
||||
Tags: p.Tags,
|
||||
Timestamp: p.Timestamp.Time(),
|
||||
Fields: p.Fields,
|
||||
})
|
||||
}
|
||||
|
||||
return points, nil
|
||||
}
|
||||
|
||||
// ErrAuthorize represents an authorization error.
|
||||
type ErrAuthorize struct {
|
||||
text string
|
||||
|
@ -286,3 +180,42 @@ func assert(condition bool, msg string, v ...interface{}) {
|
|||
|
||||
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
|
||||
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
|
||||
|
||||
// NormalizeBatchPoints returns a slice of Points, created by populating individual
|
||||
// points within the batch, which do not have timestamps or tags, with the top-level
|
||||
// values.
|
||||
func NormalizeBatchPoints(bp client.BatchPoints) ([]Point, error) {
|
||||
points := []Point{}
|
||||
for _, p := range bp.Points {
|
||||
if p.Timestamp.IsZero() {
|
||||
if bp.Timestamp.IsZero() {
|
||||
p.Timestamp = time.Now()
|
||||
} else {
|
||||
p.Timestamp = bp.Timestamp
|
||||
}
|
||||
}
|
||||
if p.Precision == "" && bp.Precision != "" {
|
||||
p.Precision = bp.Precision
|
||||
}
|
||||
p.Timestamp = client.SetPrecision(p.Timestamp, p.Precision)
|
||||
if len(bp.Tags) > 0 {
|
||||
if p.Tags == nil {
|
||||
p.Tags = make(map[string]string)
|
||||
}
|
||||
for k := range bp.Tags {
|
||||
if p.Tags[k] == "" {
|
||||
p.Tags[k] = bp.Tags[k]
|
||||
}
|
||||
}
|
||||
}
|
||||
// Need to convert from a client.Point to a influxdb.Point
|
||||
points = append(points, Point{
|
||||
Name: p.Name,
|
||||
Tags: p.Tags,
|
||||
Timestamp: p.Timestamp,
|
||||
Fields: p.Fields,
|
||||
})
|
||||
}
|
||||
|
||||
return points, nil
|
||||
}
|
||||
|
|
100
influxdb_test.go
100
influxdb_test.go
|
@ -1,47 +1,75 @@
|
|||
package influxdb_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/client"
|
||||
)
|
||||
|
||||
// Ensure that data with epoch timestamps can be decoded.
|
||||
func TestBatchPoints_Normal(t *testing.T) {
|
||||
var p influxdb.BatchPoints
|
||||
data := []byte(`
|
||||
{
|
||||
"database": "foo",
|
||||
"retentionPolicy": "bar",
|
||||
"points": [
|
||||
{
|
||||
"name": "cpu",
|
||||
"tags": {
|
||||
"host": "server01"
|
||||
},
|
||||
"timestamp": 14244733039069373,
|
||||
"precision": "n",
|
||||
"values": {
|
||||
"value": 4541770385657154000
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "cpu",
|
||||
"tags": {
|
||||
"host": "server01"
|
||||
},
|
||||
"timestamp": 14244733039069380,
|
||||
"precision": "n",
|
||||
"values": {
|
||||
"value": 7199311900554737000
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
`)
|
||||
func TestNormalizeBatchPoints(t *testing.T) {
|
||||
now := time.Now()
|
||||
tests := []struct {
|
||||
name string
|
||||
bp client.BatchPoints
|
||||
p []influxdb.Point
|
||||
err string
|
||||
}{
|
||||
{
|
||||
name: "default",
|
||||
bp: client.BatchPoints{
|
||||
Points: []client.Point{
|
||||
{Name: "cpu", Tags: map[string]string{"region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}},
|
||||
},
|
||||
},
|
||||
p: []influxdb.Point{
|
||||
{Name: "cpu", Tags: map[string]string{"region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "merge timestamp",
|
||||
bp: client.BatchPoints{
|
||||
Timestamp: now,
|
||||
Points: []client.Point{
|
||||
{Name: "cpu", Tags: map[string]string{"region": "useast"}, Fields: map[string]interface{}{"value": 1.0}},
|
||||
},
|
||||
},
|
||||
p: []influxdb.Point{
|
||||
{Name: "cpu", Tags: map[string]string{"region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "merge tags",
|
||||
bp: client.BatchPoints{
|
||||
Tags: map[string]string{"day": "monday"},
|
||||
Points: []client.Point{
|
||||
{Name: "cpu", Tags: map[string]string{"region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}},
|
||||
{Name: "memory", Timestamp: now, Fields: map[string]interface{}{"value": 2.0}},
|
||||
},
|
||||
},
|
||||
p: []influxdb.Point{
|
||||
{Name: "cpu", Tags: map[string]string{"day": "monday", "region": "useast"}, Timestamp: now, Fields: map[string]interface{}{"value": 1.0}},
|
||||
{Name: "memory", Tags: map[string]string{"day": "monday"}, Timestamp: now, Fields: map[string]interface{}{"value": 2.0}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, &p); err != nil {
|
||||
t.Errorf("failed to unmarshal nanosecond data: %s", err.Error())
|
||||
for _, test := range tests {
|
||||
t.Logf("running test %q", test.name)
|
||||
p, e := influxdb.NormalizeBatchPoints(test.bp)
|
||||
if test.err == "" && e != nil {
|
||||
t.Errorf("unexpected error %v", e)
|
||||
} else if test.err != "" && e == nil {
|
||||
t.Errorf("expected error %s, got <nil>", test.err)
|
||||
} else if e != nil && test.err != e.Error() {
|
||||
t.Errorf("unexpected error. expected: %s, got %v", test.err, e)
|
||||
}
|
||||
if !reflect.DeepEqual(p, test.p) {
|
||||
t.Logf("expected: %+v", test.p)
|
||||
t.Logf("got: %+v", p)
|
||||
t.Error("failed to normalize.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,9 +3,11 @@ package udp
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"github.com/influxdb/influxdb"
|
||||
"log"
|
||||
"net"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/client"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -44,7 +46,7 @@ func (u *UDPServer) ListenAndServe(iface string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
var bp influxdb.BatchPoints
|
||||
var bp client.BatchPoints
|
||||
buf := make([]byte, udpBufferSize)
|
||||
|
||||
go func() {
|
||||
|
|
Loading…
Reference in New Issue