Convert client to use line protocol by default

Keeps the API unchanged for now although many of the JSON features
don't map well to the line protocol endpoint so we might want to
remove them when the JSON api is removed.
pull/2725/head
Jason Wilder 2015-06-01 16:22:12 -06:00
parent ed8470d50d
commit f7d90cf94e
1 changed files with 41 additions and 15 deletions

View File

@ -6,11 +6,13 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/tsdb"
)
// Query is used to send a command to the server. Both Command and Database are required.
@ -39,6 +41,13 @@ type Client struct {
userAgent string
}
const (
ConsistencyOne = "one"
ConsistencyAll = "all"
ConsistencyQuorum = "quorum"
ConsistencyAny = "any"
)
// NewClient will instantiate and return a connected client to issue commands to the server.
func NewClient(c Config) (*Client, error) {
client := Client{
@ -111,12 +120,18 @@ func (c *Client) Query(q Query) (*Response, error) {
func (c *Client) Write(bp BatchPoints) (*Response, error) {
c.url.Path = "write"
b, err := json.Marshal(&bp)
if err != nil {
return nil, err
var b bytes.Buffer
for _, p := range bp.Points {
if _, err := b.WriteString(p.MarshalString()); err != nil {
return nil, err
}
if err := b.WriteByte('\n'); err != nil {
return nil, err
}
}
req, err := http.NewRequest("POST", c.url.String(), bytes.NewBuffer(b))
req, err := http.NewRequest("POST", c.url.String(), &b)
if err != nil {
return nil, err
}
@ -125,6 +140,12 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Add("db", bp.Database)
params.Add("rp", bp.RetentionPolicy)
params.Add("precision", bp.Precision)
params.Add("consistency", bp.WriteConsistency)
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
@ -133,15 +154,15 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
defer resp.Body.Close()
var response Response
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
err = dec.Decode(&response)
body, err := ioutil.ReadAll(resp.Body)
if err != nil && err.Error() != "EOF" {
return nil, err
}
if resp.StatusCode != http.StatusNoContent {
return &response, response.Error()
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
var err = fmt.Errorf(string(body))
response.Err = err
return &response, err
}
return nil, nil
@ -338,6 +359,10 @@ func (p *Point) MarshalJSON() ([]byte, error) {
return json.Marshal(&point)
}
func (p *Point) MarshalString() string {
return tsdb.NewPoint(p.Name, p.Tags, p.Fields, p.Time).String()
}
// UnmarshalJSON decodes the data into the Point struct
func (p *Point) UnmarshalJSON(b []byte) error {
var normal struct {
@ -423,12 +448,13 @@ func normalizeFields(fields map[string]interface{}) map[string]interface{} {
// Precision can be specified if the time is in epoch format (integer).
// Valid values for Precision are n, u, ms, s, m, and h
type BatchPoints struct {
Points []Point `json:"points,omitempty"`
Database string `json:"database,omitempty"`
RetentionPolicy string `json:"retentionPolicy,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Time time.Time `json:"time,omitempty"`
Precision string `json:"precision,omitempty"`
Points []Point `json:"points,omitempty"`
Database string `json:"database,omitempty"`
RetentionPolicy string `json:"retentionPolicy,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Time time.Time `json:"time,omitempty"`
Precision string `json:"precision,omitempty"`
WriteConsistency string `json:"-"`
}
// UnmarshalJSON decodes the data into the BatchPoints struct