2015-01-22 22:23:55 +00:00
|
|
|
package client
|
|
|
|
|
2015-01-22 23:40:32 +00:00
|
|
|
import (
|
2015-01-23 20:37:53 +00:00
|
|
|
"bytes"
|
2015-01-23 00:18:24 +00:00
|
|
|
"encoding/json"
|
2015-01-29 21:07:43 +00:00
|
|
|
"errors"
|
2015-01-29 18:10:13 +00:00
|
|
|
"fmt"
|
2015-01-22 23:40:32 +00:00
|
|
|
"net/http"
|
|
|
|
"net/url"
|
|
|
|
"time"
|
|
|
|
|
2015-01-29 21:07:43 +00:00
|
|
|
"github.com/influxdb/influxdb/influxql"
|
2015-01-22 23:40:32 +00:00
|
|
|
)
|
|
|
|
|
2015-01-22 22:23:55 +00:00
|
|
|
type Config struct {
|
2015-01-23 22:49:23 +00:00
|
|
|
URL url.URL
|
2015-01-23 18:05:04 +00:00
|
|
|
Username string
|
|
|
|
Password string
|
2015-01-22 22:23:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type Client struct {
|
2015-01-23 22:49:23 +00:00
|
|
|
url url.URL
|
2015-01-23 18:05:04 +00:00
|
|
|
username string
|
|
|
|
password string
|
2015-01-22 23:40:32 +00:00
|
|
|
httpClient *http.Client
|
2015-01-22 22:23:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type Query struct {
|
2015-01-23 00:18:24 +00:00
|
|
|
Command string
|
|
|
|
Database string
|
2015-01-22 22:23:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type Write struct {
|
2015-01-23 20:37:53 +00:00
|
|
|
Database string
|
|
|
|
RetentionPolicy string
|
2015-01-29 21:07:43 +00:00
|
|
|
Points []Point
|
2015-01-22 22:23:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewClient(c Config) (*Client, error) {
|
2015-01-22 23:40:32 +00:00
|
|
|
client := Client{
|
2015-01-23 22:49:23 +00:00
|
|
|
url: c.URL,
|
2015-01-23 18:05:04 +00:00
|
|
|
username: c.Username,
|
|
|
|
password: c.Password,
|
2015-01-22 23:40:32 +00:00
|
|
|
httpClient: &http.Client{},
|
|
|
|
}
|
|
|
|
return &client, nil
|
2015-01-22 22:23:55 +00:00
|
|
|
}
|
|
|
|
|
2015-01-29 21:07:43 +00:00
|
|
|
func (c *Client) Query(q Query) (*Results, error) {
|
2015-01-26 21:12:58 +00:00
|
|
|
u := c.url
|
|
|
|
|
|
|
|
u.Path = "query"
|
|
|
|
values := u.Query()
|
2015-01-23 00:18:24 +00:00
|
|
|
values.Set("q", q.Command)
|
|
|
|
values.Set("db", q.Database)
|
2015-01-26 21:12:58 +00:00
|
|
|
u.RawQuery = values.Encode()
|
2015-01-23 00:18:24 +00:00
|
|
|
|
2015-01-26 21:34:24 +00:00
|
|
|
resp, err := c.httpClient.Get(u.String())
|
2015-01-23 00:18:24 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
2015-01-29 21:07:43 +00:00
|
|
|
var results Results
|
2015-01-23 18:07:41 +00:00
|
|
|
err = json.NewDecoder(resp.Body).Decode(&results)
|
2015-01-23 00:18:24 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2015-01-27 20:27:29 +00:00
|
|
|
return &results, nil
|
2015-01-22 22:23:55 +00:00
|
|
|
}
|
|
|
|
|
2015-01-29 21:07:43 +00:00
|
|
|
func (c *Client) Write(writes ...Write) (*Results, error) {
|
2015-01-23 22:49:23 +00:00
|
|
|
c.url.Path = "write"
|
2015-01-23 20:37:53 +00:00
|
|
|
type data struct {
|
2015-01-29 21:07:43 +00:00
|
|
|
Points []Point `json:"points"`
|
|
|
|
Database string `json:"database"`
|
|
|
|
RetentionPolicy string `json:"retentionPolicy"`
|
2015-01-23 20:37:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
d := []data{}
|
|
|
|
for _, write := range writes {
|
|
|
|
d = append(d, data{Points: write.Points, Database: write.Database, RetentionPolicy: write.RetentionPolicy})
|
|
|
|
}
|
|
|
|
|
|
|
|
b := []byte{}
|
2015-01-23 22:49:23 +00:00
|
|
|
err := json.Unmarshal(b, &d)
|
2015-01-23 20:37:53 +00:00
|
|
|
|
2015-01-23 22:49:23 +00:00
|
|
|
resp, err := c.httpClient.Post(c.url.String(), "application/json", bytes.NewBuffer(b))
|
2015-01-23 20:37:53 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
2015-01-29 21:07:43 +00:00
|
|
|
var results Results
|
2015-01-23 20:37:53 +00:00
|
|
|
err = json.NewDecoder(resp.Body).Decode(&results)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2015-01-27 20:27:29 +00:00
|
|
|
return &results, nil
|
2015-01-22 22:23:55 +00:00
|
|
|
}
|
2015-01-22 23:40:32 +00:00
|
|
|
|
2015-01-26 21:12:58 +00:00
|
|
|
func (c *Client) Ping() (time.Duration, string, error) {
|
2015-01-22 23:40:32 +00:00
|
|
|
now := time.Now()
|
2015-01-26 21:12:58 +00:00
|
|
|
u := c.url
|
|
|
|
u.Path = "ping"
|
|
|
|
resp, err := c.httpClient.Get(u.String())
|
2015-01-22 23:40:32 +00:00
|
|
|
if err != nil {
|
2015-01-26 21:12:58 +00:00
|
|
|
return 0, "", err
|
2015-01-22 23:40:32 +00:00
|
|
|
}
|
2015-01-26 21:12:58 +00:00
|
|
|
version := resp.Header.Get("X-Influxdb-Version")
|
|
|
|
return time.Since(now), version, nil
|
2015-01-22 23:40:32 +00:00
|
|
|
}
|
|
|
|
|
2015-01-29 21:07:43 +00:00
|
|
|
// Structs
|
|
|
|
|
|
|
|
// Result represents a resultset returned from a single statement.
|
|
|
|
type Result struct {
|
|
|
|
Rows []*influxql.Row
|
|
|
|
Err error
|
|
|
|
}
|
|
|
|
|
|
|
|
// MarshalJSON encodes the result into JSON.
|
|
|
|
func (r *Result) MarshalJSON() ([]byte, error) {
|
|
|
|
// Define a struct that outputs "error" as a string.
|
|
|
|
var o struct {
|
|
|
|
Rows []*influxql.Row `json:"rows,omitempty"`
|
|
|
|
Err string `json:"error,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// Copy fields to output struct.
|
|
|
|
o.Rows = r.Rows
|
|
|
|
if r.Err != nil {
|
|
|
|
o.Err = r.Err.Error()
|
|
|
|
}
|
|
|
|
|
|
|
|
return json.Marshal(&o)
|
|
|
|
}
|
|
|
|
|
|
|
|
// UnmarshalJSON decodes the data into the Result struct
|
|
|
|
func (r *Result) UnmarshalJSON(b []byte) error {
|
|
|
|
var o struct {
|
|
|
|
Rows []*influxql.Row `json:"rows,omitempty"`
|
|
|
|
Err string `json:"error,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
err := json.Unmarshal(b, &o)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
r.Rows = o.Rows
|
|
|
|
if o.Err != "" {
|
|
|
|
r.Err = errors.New(o.Err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Results represents a list of statement results.
|
|
|
|
type Results struct {
|
|
|
|
Results []*Result
|
|
|
|
Err error
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r Results) MarshalJSON() ([]byte, error) {
|
|
|
|
// Define a struct that outputs "error" as a string.
|
|
|
|
var o struct {
|
|
|
|
Results []*Result `json:"results,omitempty"`
|
|
|
|
Err string `json:"error,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// Copy fields to output struct.
|
|
|
|
o.Results = r.Results
|
|
|
|
if r.Err != nil {
|
|
|
|
o.Err = r.Err.Error()
|
|
|
|
}
|
|
|
|
|
|
|
|
return json.Marshal(&o)
|
|
|
|
}
|
|
|
|
|
|
|
|
// UnmarshalJSON decodes the data into the Results struct
|
|
|
|
func (r *Results) UnmarshalJSON(b []byte) error {
|
|
|
|
var o struct {
|
|
|
|
Results []*Result `json:"results,omitempty"`
|
|
|
|
Err string `json:"error,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
err := json.Unmarshal(b, &o)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
r.Results = o.Results
|
|
|
|
if o.Err != "" {
|
|
|
|
r.Err = errors.New(o.Err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Error returns the first error from any statement.
|
|
|
|
// Returns nil if no errors occurred on any statements.
|
|
|
|
func (a Results) Error() error {
|
|
|
|
for _, r := range a.Results {
|
|
|
|
if r.Err != nil {
|
|
|
|
return r.Err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-01-29 23:33:31 +00:00
|
|
|
// Timestamp is a custom type so we marshal JSON properly into UTC nanosecond time
|
2015-01-29 22:16:05 +00:00
|
|
|
type Timestamp time.Time
|
|
|
|
|
2015-01-29 23:33:31 +00:00
|
|
|
// Time returns the time represented by the Timestamp
|
2015-01-29 22:16:05 +00:00
|
|
|
func (t Timestamp) Time() time.Time {
|
|
|
|
return time.Time(t)
|
|
|
|
}
|
|
|
|
|
2015-01-29 23:33:31 +00:00
|
|
|
// MarshalJSON returns time in UTC with nanoseconds
|
2015-01-29 22:16:05 +00:00
|
|
|
func (t Timestamp) MarshalJSON() ([]byte, error) {
|
|
|
|
// Always send back in UTC with nanoseconds
|
|
|
|
s := t.Time().UTC().Format(time.RFC3339Nano)
|
|
|
|
return []byte(`"` + s + `"`), nil
|
|
|
|
}
|
|
|
|
|
2015-01-29 21:07:43 +00:00
|
|
|
// Point defines the values that will be written to the database
|
|
|
|
type Point struct {
|
2015-01-29 22:16:05 +00:00
|
|
|
Name string `json:"name"`
|
|
|
|
Tags map[string]string `json:"tags"`
|
|
|
|
Timestamp Timestamp `json:"timestamp"`
|
|
|
|
Values map[string]interface{} `json:"values"`
|
2015-01-29 23:44:10 +00:00
|
|
|
Precision string `json:"precision"`
|
2015-01-29 21:07:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// UnmarshalJSON decodes the data into the Point struct
|
|
|
|
func (p *Point) UnmarshalJSON(b []byte) error {
|
|
|
|
var normal struct {
|
|
|
|
Name string `json:"name"`
|
|
|
|
Tags map[string]string `json:"tags"`
|
|
|
|
Timestamp time.Time `json:"timestamp"`
|
2015-01-29 23:22:43 +00:00
|
|
|
Precision string `json:"precision"`
|
2015-01-29 21:07:43 +00:00
|
|
|
Values map[string]interface{} `json:"values"`
|
|
|
|
}
|
|
|
|
var epoch struct {
|
|
|
|
Name string `json:"name"`
|
|
|
|
Tags map[string]string `json:"tags"`
|
2015-02-04 02:30:05 +00:00
|
|
|
Timestamp *int64 `json:"timestamp"`
|
2015-01-29 21:07:43 +00:00
|
|
|
Precision string `json:"precision"`
|
|
|
|
Values map[string]interface{} `json:"values"`
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := func() error {
|
|
|
|
var err error
|
|
|
|
if err = json.Unmarshal(b, &epoch); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2015-02-04 02:30:05 +00:00
|
|
|
// Convert from epoch to time.Time, but only if Timestamp
|
|
|
|
// was actually set.
|
|
|
|
var ts time.Time
|
|
|
|
if epoch.Timestamp != nil {
|
|
|
|
ts, err = EpochToTime(*epoch.Timestamp, epoch.Precision)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2015-01-29 21:07:43 +00:00
|
|
|
}
|
|
|
|
p.Name = epoch.Name
|
|
|
|
p.Tags = epoch.Tags
|
2015-01-29 22:16:05 +00:00
|
|
|
p.Timestamp = Timestamp(ts)
|
2015-01-29 23:44:10 +00:00
|
|
|
p.Precision = epoch.Precision
|
2015-01-29 21:07:43 +00:00
|
|
|
p.Values = epoch.Values
|
|
|
|
return nil
|
|
|
|
}(); err == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := json.Unmarshal(b, &normal); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2015-01-29 23:22:43 +00:00
|
|
|
normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision)
|
2015-01-29 21:07:43 +00:00
|
|
|
p.Name = normal.Name
|
|
|
|
p.Tags = normal.Tags
|
2015-01-29 22:16:05 +00:00
|
|
|
p.Timestamp = Timestamp(normal.Timestamp)
|
2015-01-29 23:44:10 +00:00
|
|
|
p.Precision = normal.Precision
|
2015-01-29 21:07:43 +00:00
|
|
|
p.Values = normal.Values
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-01-22 23:40:32 +00:00
|
|
|
// utility functions
|
|
|
|
|
|
|
|
func (c *Client) Addr() string {
|
2015-01-23 22:49:23 +00:00
|
|
|
return c.url.String()
|
2015-01-22 23:40:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// helper functions
|
|
|
|
|
2015-01-29 23:33:31 +00:00
|
|
|
// EpochToTime takes a unix epoch time and uses precision to return back a time.Time
|
2015-01-29 18:10:13 +00:00
|
|
|
func EpochToTime(epoch int64, precision string) (time.Time, error) {
|
|
|
|
if precision == "" {
|
|
|
|
precision = "s"
|
|
|
|
}
|
|
|
|
var t time.Time
|
|
|
|
switch precision {
|
|
|
|
case "h":
|
|
|
|
t = time.Unix(0, epoch*int64(time.Hour))
|
|
|
|
case "m":
|
|
|
|
t = time.Unix(0, epoch*int64(time.Minute))
|
|
|
|
case "s":
|
|
|
|
t = time.Unix(0, epoch*int64(time.Second))
|
|
|
|
case "ms":
|
|
|
|
t = time.Unix(0, epoch*int64(time.Millisecond))
|
|
|
|
case "u":
|
|
|
|
t = time.Unix(0, epoch*int64(time.Microsecond))
|
|
|
|
case "n":
|
|
|
|
t = time.Unix(0, epoch)
|
|
|
|
default:
|
|
|
|
return time.Time{}, fmt.Errorf("Unknowm precision %q", precision)
|
|
|
|
}
|
|
|
|
return t, nil
|
|
|
|
}
|
|
|
|
|
2015-01-29 23:22:43 +00:00
|
|
|
// SetPrecision will round a time to the specified precision
|
|
|
|
func SetPrecision(t time.Time, precision string) time.Time {
|
|
|
|
switch precision {
|
|
|
|
case "n":
|
|
|
|
case "u":
|
|
|
|
return t.Round(time.Microsecond)
|
|
|
|
case "ms":
|
|
|
|
return t.Round(time.Millisecond)
|
|
|
|
case "s":
|
|
|
|
return t.Round(time.Second)
|
|
|
|
case "m":
|
|
|
|
return t.Round(time.Minute)
|
|
|
|
case "h":
|
|
|
|
return t.Round(time.Hour)
|
|
|
|
}
|
|
|
|
return t
|
|
|
|
}
|
|
|
|
|
2015-01-22 23:40:32 +00:00
|
|
|
func detect(values ...string) string {
|
|
|
|
for _, v := range values {
|
|
|
|
if v != "" {
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return ""
|
|
|
|
}
|