Use BatchPoints for writing from client library

pull/1868/head
Cory LaNou 2015-03-06 10:29:32 -07:00
parent 17c5ca0e74
commit bda9685526
8 changed files with 166 additions and 182 deletions

View File

@ -12,6 +12,11 @@ import (
"github.com/influxdb/influxdb/influxql"
)
type Query struct {
Command string
Database string
}
type Config struct {
URL url.URL
Username string
@ -27,17 +32,6 @@ type Client struct {
userAgent string
}
type Query struct {
Command string
Database string
}
type Write struct {
Database string
RetentionPolicy string
Points []Point
}
func NewClient(c Config) (*Client, error) {
client := Client{
url: c.URL,
@ -49,6 +43,73 @@ func NewClient(c Config) (*Client, error) {
return &client, nil
}
// BatchPoints is used to send batched data in a single write.
type BatchPoints struct {
Points []Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
Precision string `json:"precision"`
}
// UnmarshalJSON decodes the data into the BatchPoints struct
func (bp *BatchPoints) UnmarshalJSON(b []byte) error {
var normal struct {
Points []Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
Precision string `json:"precision"`
}
var epoch struct {
Points []Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp *int64 `json:"timestamp"`
Precision string `json:"precision"`
}
if err := func() error {
var err error
if err = json.Unmarshal(b, &epoch); err != nil {
return err
}
// Convert from epoch to time.Time
var ts time.Time
if epoch.Timestamp != nil {
ts, err = EpochToTime(*epoch.Timestamp, epoch.Precision)
if err != nil {
return err
}
}
bp.Points = epoch.Points
bp.Database = epoch.Database
bp.RetentionPolicy = epoch.RetentionPolicy
bp.Tags = epoch.Tags
bp.Timestamp = ts
bp.Precision = epoch.Precision
return nil
}(); err == nil {
return nil
}
if err := json.Unmarshal(b, &normal); err != nil {
return err
}
normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision)
bp.Points = normal.Points
bp.Database = normal.Database
bp.RetentionPolicy = normal.RetentionPolicy
bp.Tags = normal.Tags
bp.Timestamp = normal.Timestamp
bp.Precision = normal.Precision
return nil
}
func (c *Client) Query(q Query) (*Results, error) {
u := c.url
@ -81,7 +142,7 @@ func (c *Client) Query(q Query) (*Results, error) {
return &results, nil
}
func (c *Client) Write(writes ...Write) (*Results, error) {
func (c *Client) Write(bp BatchPoints) (*Results, error) {
c.url.Path = "write"
type data struct {
Points []Point `json:"points"`
@ -89,13 +150,8 @@ func (c *Client) Write(writes ...Write) (*Results, error) {
RetentionPolicy string `json:"retentionPolicy"`
}
d := []data{}
for _, write := range writes {
d = append(d, data{Points: write.Points, Database: write.Database, RetentionPolicy: write.RetentionPolicy})
}
b := []byte{}
err := json.Unmarshal(b, &d)
err := json.Unmarshal(b, &bp)
req, err := http.NewRequest("POST", c.url.String(), bytes.NewBuffer(b))
if err != nil {

View File

@ -111,8 +111,8 @@ func TestClient_Write(t *testing.T) {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}
write := client.Write{}
_, err = c.Write(write)
bp := client.BatchPoints{}
_, err = c.Write(bp)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}
@ -172,8 +172,8 @@ func TestClient_UserAgent(t *testing.T) {
}
receivedUserAgent = ""
write := client.Write{}
_, err = c.Write(write)
bp := client.BatchPoints{}
_, err = c.Write(bp)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}
@ -340,3 +340,42 @@ func emptyTestServer() *httptest.Server {
return
}))
}
// Ensure that data with epoch timestamps can be decoded.
func TestBatchPoints_Normal(t *testing.T) {
var bp client.BatchPoints
data := []byte(`
{
"database": "foo",
"retentionPolicy": "bar",
"points": [
{
"name": "cpu",
"tags": {
"host": "server01"
},
"timestamp": 14244733039069373,
"precision": "n",
"values": {
"value": 4541770385657154000
}
},
{
"name": "cpu",
"tags": {
"host": "server01"
},
"timestamp": 14244733039069380,
"precision": "n",
"values": {
"value": 7199311900554737000
}
}
]
}
`)
if err := json.Unmarshal(data, &bp); err != nil {
t.Errorf("failed to unmarshal nanosecond data: %s", err.Error())
}
}

View File

@ -22,6 +22,7 @@ import (
"github.com/bmizerany/pat"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/influxql"
)
@ -176,7 +177,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ
// serveWrite receives incoming series data and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
var bp influxdb.BatchPoints
var bp client.BatchPoints
var dec *json.Decoder
if h.WriteTrace {

View File

@ -16,6 +16,7 @@ import (
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/httpd"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/messaging"
@ -87,7 +88,7 @@ func TestBatchWrite_UnmarshalEpoch(t *testing.T) {
t.Logf("testing %q\n", test.name)
data := []byte(fmt.Sprintf(`{"timestamp": %d, "precision":"%s"}`, test.epoch, test.precision))
t.Logf("json: %s", string(data))
var bp influxdb.BatchPoints
var bp client.BatchPoints
err := json.Unmarshal(data, &bp)
if err != nil {
t.Fatalf("unexpected error. expected: %v, actual: %v", nil, err)
@ -125,7 +126,7 @@ func TestBatchWrite_UnmarshalRFC(t *testing.T) {
ts := test.now.Format(test.rfc)
data := []byte(fmt.Sprintf(`{"timestamp": %q}`, ts))
t.Logf("json: %s", string(data))
var bp influxdb.BatchPoints
var bp client.BatchPoints
err := json.Unmarshal(data, &bp)
if err != nil {
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)

View File

@ -5,9 +5,6 @@ import (
"errors"
"fmt"
"os"
"time"
"github.com/influxdb/influxdb/client"
)
var (
@ -130,112 +127,6 @@ var (
ErrContinuousQueryExists = errors.New("continuous query already exists")
)
// BatchPoints is used to send batched data in a single write.
type BatchPoints struct {
Points []client.Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
Precision string `json:"precision"`
}
// UnmarshalJSON decodes the data into the BatchPoints struct
func (bp *BatchPoints) UnmarshalJSON(b []byte) error {
var normal struct {
Points []client.Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
Precision string `json:"precision"`
}
var epoch struct {
Points []client.Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp *int64 `json:"timestamp"`
Precision string `json:"precision"`
}
if err := func() error {
var err error
if err = json.Unmarshal(b, &epoch); err != nil {
return err
}
// Convert from epoch to time.Time
var ts time.Time
if epoch.Timestamp != nil {
ts, err = client.EpochToTime(*epoch.Timestamp, epoch.Precision)
if err != nil {
return err
}
}
bp.Points = epoch.Points
bp.Database = epoch.Database
bp.RetentionPolicy = epoch.RetentionPolicy
bp.Tags = epoch.Tags
bp.Timestamp = ts
bp.Precision = epoch.Precision
return nil
}(); err == nil {
return nil
}
if err := json.Unmarshal(b, &normal); err != nil {
return err
}
normal.Timestamp = client.SetPrecision(normal.Timestamp, normal.Precision)
bp.Points = normal.Points
bp.Database = normal.Database
bp.RetentionPolicy = normal.RetentionPolicy
bp.Tags = normal.Tags
bp.Timestamp = normal.Timestamp
bp.Precision = normal.Precision
return nil
}
// NormalizeBatchPoints returns a slice of Points, created by populating individual
// points within the batch, which do not have timestamps or tags, with the top-level
// values.
func NormalizeBatchPoints(bp BatchPoints) ([]Point, error) {
points := []Point{}
for _, p := range bp.Points {
if p.Timestamp.Time().IsZero() {
if bp.Timestamp.IsZero() {
p.Timestamp = client.Timestamp(time.Now())
} else {
p.Timestamp = client.Timestamp(bp.Timestamp)
}
}
if p.Precision == "" && bp.Precision != "" {
p.Precision = bp.Precision
}
p.Timestamp = client.Timestamp(client.SetPrecision(p.Timestamp.Time(), p.Precision))
if len(bp.Tags) > 0 {
if p.Tags == nil {
p.Tags = make(map[string]string)
}
for k := range bp.Tags {
if p.Tags[k] == "" {
p.Tags[k] = bp.Tags[k]
}
}
}
// Need to convert from a client.Point to a influxdb.Point
points = append(points, Point{
Name: p.Name,
Tags: p.Tags,
Timestamp: p.Timestamp.Time(),
Fields: p.Fields,
})
}
return points, nil
}
// ErrAuthorize represents an authorization error.
type ErrAuthorize struct {
text string

View File

@ -1,47 +1 @@
package influxdb_test
import (
"encoding/json"
"testing"
"github.com/influxdb/influxdb"
)
// Ensure that data with epoch timestamps can be decoded.
func TestBatchPoints_Normal(t *testing.T) {
var p influxdb.BatchPoints
data := []byte(`
{
"database": "foo",
"retentionPolicy": "bar",
"points": [
{
"name": "cpu",
"tags": {
"host": "server01"
},
"timestamp": 14244733039069373,
"precision": "n",
"values": {
"value": 4541770385657154000
}
},
{
"name": "cpu",
"tags": {
"host": "server01"
},
"timestamp": 14244733039069380,
"precision": "n",
"values": {
"value": 7199311900554737000
}
}
]
}
`)
if err := json.Unmarshal(data, &p); err != nil {
t.Errorf("failed to unmarshal nanosecond data: %s", err.Error())
}
}

View File

@ -19,6 +19,7 @@ import (
"sync"
"time"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/messaging"
"golang.org/x/crypto/bcrypt"
@ -1367,6 +1368,45 @@ type Point struct {
Fields map[string]interface{}
}
// NormalizeBatchPoints returns a slice of Points, created by populating individual
// points within the batch, which do not have timestamps or tags, with the top-level
// values.
func NormalizeBatchPoints(bp client.BatchPoints) ([]Point, error) {
points := []Point{}
for _, p := range bp.Points {
if p.Timestamp.Time().IsZero() {
if bp.Timestamp.IsZero() {
p.Timestamp = client.Timestamp(time.Now())
} else {
p.Timestamp = client.Timestamp(bp.Timestamp)
}
}
if p.Precision == "" && bp.Precision != "" {
p.Precision = bp.Precision
}
p.Timestamp = client.Timestamp(client.SetPrecision(p.Timestamp.Time(), p.Precision))
if len(bp.Tags) > 0 {
if p.Tags == nil {
p.Tags = make(map[string]string)
}
for k := range bp.Tags {
if p.Tags[k] == "" {
p.Tags[k] = bp.Tags[k]
}
}
}
// Need to convert from a client.Point to a influxdb.Point
points = append(points, Point{
Name: p.Name,
Tags: p.Tags,
Timestamp: p.Timestamp.Time(),
Fields: p.Fields,
})
}
return points, nil
}
// WriteSeries writes series data to the database.
// Returns the messaging index the data was written to.
func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) {

View File

@ -3,9 +3,11 @@ package udp
import (
"bytes"
"encoding/json"
"github.com/influxdb/influxdb"
"log"
"net"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/client"
)
const (
@ -44,7 +46,7 @@ func (u *UDPServer) ListenAndServe(iface string) error {
return err
}
var bp influxdb.BatchPoints
var bp client.BatchPoints
buf := make([]byte, udpBufferSize)
go func() {