influxdb/client/influxdb.go

150 lines
2.7 KiB
Go
Raw Normal View History

package client
import (
2015-01-23 20:37:53 +00:00
"bytes"
2015-01-23 00:18:24 +00:00
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"
"github.com/influxdb/influxdb"
)
const (
defaultAddr = "localhost:8086"
)
type Config struct {
2015-01-23 18:05:04 +00:00
Addr string
Username string
Password string
}
type Client struct {
addr string
2015-01-23 18:05:04 +00:00
username string
password string
httpClient *http.Client
}
type Query struct {
2015-01-23 00:18:24 +00:00
Command string
Database string
}
type Write struct {
2015-01-23 20:37:53 +00:00
Database string
RetentionPolicy string
Points []influxdb.Point
}
func NewClient(c Config) (*Client, error) {
client := Client{
addr: detect(c.Addr, defaultAddr),
2015-01-23 18:05:04 +00:00
username: c.Username,
password: c.Password,
httpClient: &http.Client{},
}
return &client, nil
}
2015-01-23 00:18:24 +00:00
func (c *Client) Query(q Query) (influxdb.Results, error) {
u, err := c.urlFor("/query")
if err != nil {
return nil, err
}
values := u.Query()
values.Set("q", q.Command)
values.Set("db", q.Database)
u.RawQuery = values.Encode()
resp, err := c.httpClient.Get(u.String())
if err != nil {
return nil, err
}
defer resp.Body.Close()
var results influxdb.Results
2015-01-23 18:07:41 +00:00
err = json.NewDecoder(resp.Body).Decode(&results)
2015-01-23 00:18:24 +00:00
if err != nil {
return nil, err
}
return results, nil
}
func (c *Client) Write(writes ...Write) (influxdb.Results, error) {
2015-01-23 20:37:53 +00:00
u, err := c.urlFor("/write")
if err != nil {
return nil, err
}
type data struct {
Points []influxdb.Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
}
d := []data{}
for _, write := range writes {
d = append(d, data{Points: write.Points, Database: write.Database, RetentionPolicy: write.RetentionPolicy})
}
b := []byte{}
err = json.Unmarshal(b, &d)
resp, err := c.httpClient.Post(u.String(), "application/json", bytes.NewBuffer(b))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var results influxdb.Results
err = json.NewDecoder(resp.Body).Decode(&results)
if err != nil {
return nil, err
}
return results, nil
}
func (c *Client) Ping() (time.Duration, error) {
now := time.Now()
u, err := c.urlFor("/ping")
if err != nil {
return 0, err
}
_, err = c.httpClient.Get(u.String())
if err != nil {
return 0, err
}
return time.Since(now), nil
}
// utility functions
func (c *Client) Addr() string {
return c.addr
}
2015-01-23 18:05:04 +00:00
func (c *Client) urlFor(path string) (*url.URL, error) {
var u *url.URL
u, err := url.Parse(fmt.Sprintf("%s%s", c.addr, path))
if err != nil {
return nil, err
}
if c.username != "" {
u.User = url.UserPassword(c.username, c.password)
}
return u, nil
}
// helper functions
func detect(values ...string) string {
for _, v := range values {
if v != "" {
return v
}
}
return ""
}