Add ping and detection of influxdb source types
parent
a1f042a411
commit
a25a362360
|
@ -47,6 +47,28 @@ type Assets interface {
|
|||
Handler() http.Handler
|
||||
}
|
||||
|
||||
// Supported time-series databases
|
||||
const (
|
||||
// InfluxDB is the open-source time-series database
|
||||
InfluxDB = "influx"
|
||||
// InfluxEnteprise is the clustered HA time-series database
|
||||
InfluxEnterprise = "influx-enterprise"
|
||||
// InfluxRelay is the basic HA layer over InfluxDB
|
||||
InfluxRelay = "influx-relay"
|
||||
)
|
||||
|
||||
// TSDBStatus represents the current status of a time series database
|
||||
type TSDBStatus interface {
|
||||
// Connect will connect to the time series using the information in `Source`.
|
||||
Connect(ctx context.Context, src *Source) error
|
||||
// Ping returns version and TSDB type of time series database if reachable.
|
||||
Ping(context.Context) error
|
||||
// Version returns the version of the TSDB database
|
||||
Version(context.Context) (string, error)
|
||||
// Type returns the type of the TSDB database
|
||||
Type(context.Context) (string, error)
|
||||
}
|
||||
|
||||
// TimeSeries represents a queryable time series database.
|
||||
type TimeSeries interface {
|
||||
// Query retrieves time series data from the database.
|
||||
|
|
|
@ -5,13 +5,16 @@ import (
|
|||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/chronograf"
|
||||
)
|
||||
|
||||
var _ chronograf.TimeSeries = &Client{}
|
||||
var _ chronograf.TSDBStatus = &Client{}
|
||||
|
||||
// Shared transports for all clients to prevent leaking connections
|
||||
var (
|
||||
|
@ -184,3 +187,82 @@ func (c *Client) Users(ctx context.Context) chronograf.UsersStore {
|
|||
func (c *Client) Roles(ctx context.Context) (chronograf.RolesStore, error) {
|
||||
return nil, fmt.Errorf("Roles not support in open-source InfluxDB. Roles are support in Influx Enterprise")
|
||||
}
|
||||
|
||||
// Ping hits the influxdb ping endpoint and returns the type of influx
|
||||
func (c *Client) Ping(ctx context.Context) error {
|
||||
_, _, err := c.pingTimeout(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
// Version hits the influxdb ping endpoint and returns the version of influx
|
||||
func (c *Client) Version(ctx context.Context) (string, error) {
|
||||
version, _, err := c.pingTimeout(ctx)
|
||||
return version, err
|
||||
}
|
||||
|
||||
// Type hits the influxdb ping endpoint and returns the type of influx running
|
||||
func (c *Client) Type(ctx context.Context) (string, error) {
|
||||
_, tsdbType, err := c.pingTimeout(ctx)
|
||||
return tsdbType, err
|
||||
}
|
||||
|
||||
func (c *Client) pingTimeout(ctx context.Context) (string, string, error) {
|
||||
resps := make(chan (pingResult))
|
||||
go func() {
|
||||
version, tsdbType, err := c.ping(c.URL)
|
||||
resps <- pingResult{version, tsdbType, err}
|
||||
}()
|
||||
|
||||
select {
|
||||
case resp := <-resps:
|
||||
return resp.Version, resp.Type, resp.Err
|
||||
case <-ctx.Done():
|
||||
return "", "", chronograf.ErrUpstreamTimeout
|
||||
}
|
||||
}
|
||||
|
||||
type pingResult struct {
|
||||
Version string
|
||||
Type string
|
||||
Err error
|
||||
}
|
||||
|
||||
func (c *Client) ping(u *url.URL) (string, string, error) {
|
||||
u.Path = "ping"
|
||||
|
||||
req, err := http.NewRequest("GET", u.String(), nil)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
hc := &http.Client{}
|
||||
if c.InsecureSkipVerify {
|
||||
hc.Transport = skipVerifyTransport
|
||||
} else {
|
||||
hc.Transport = defaultTransport
|
||||
}
|
||||
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusNoContent {
|
||||
var err = fmt.Errorf(string(body))
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
version := resp.Header.Get("X-Influxdb-Version")
|
||||
if strings.Contains(version, "-c") {
|
||||
return version, chronograf.InfluxEnterprise, nil
|
||||
} else if strings.Contains(version, "relay") {
|
||||
return version, chronograf.InfluxRelay, nil
|
||||
}
|
||||
return version, chronograf.InfluxDB, nil
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ type InfluxClient struct{}
|
|||
|
||||
// New creates a client to connect to OSS or enterprise
|
||||
func (c *InfluxClient) New(src chronograf.Source, logger chronograf.Logger) (chronograf.TimeSeries, error) {
|
||||
if src.Type == "influx-enterprise" && src.MetaURL != "" {
|
||||
if src.Type == chronograf.InfluxEnterprise && src.MetaURL != "" {
|
||||
dataNode := &influx.Client{
|
||||
Logger: logger,
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"net/url"
|
||||
|
||||
"github.com/influxdata/chronograf"
|
||||
"github.com/influxdata/chronograf/influx"
|
||||
)
|
||||
|
||||
type sourceLinks struct {
|
||||
|
@ -45,7 +46,7 @@ func newSourceResponse(src chronograf.Source) sourceResponse {
|
|||
},
|
||||
}
|
||||
|
||||
if src.Type == "influx-enterprise" {
|
||||
if src.Type == chronograf.InfluxEnterprise {
|
||||
res.Links.Roles = fmt.Sprintf("%s/%d/roles", httpAPISrcs, src.ID)
|
||||
}
|
||||
return res
|
||||
|
@ -69,8 +70,15 @@ func (h *Service) NewSource(w http.ResponseWriter, r *http.Request) {
|
|||
src.Telegraf = "telegraf"
|
||||
}
|
||||
|
||||
var err error
|
||||
if src, err = h.SourcesStore.Add(r.Context(), src); err != nil {
|
||||
ctx := r.Context()
|
||||
dbType, err := h.tsdbType(ctx, &src)
|
||||
if err != nil {
|
||||
Error(w, http.StatusBadRequest, "Error contacting source", h.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
src.Type = dbType
|
||||
if src, err = h.SourcesStore.Add(ctx, src); err != nil {
|
||||
msg := fmt.Errorf("Error storing source %v: %v", src, err)
|
||||
unknownErrorWithMessage(w, msg, h.Logger)
|
||||
return
|
||||
|
@ -81,6 +89,16 @@ func (h *Service) NewSource(w http.ResponseWriter, r *http.Request) {
|
|||
encodeJSON(w, http.StatusCreated, res, h.Logger)
|
||||
}
|
||||
|
||||
func (h *Service) tsdbType(ctx context.Context, src *chronograf.Source) (string, error) {
|
||||
cli := &influx.Client{
|
||||
Logger: h.Logger,
|
||||
}
|
||||
if err := cli.Connect(ctx, src); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return cli.Type(ctx)
|
||||
}
|
||||
|
||||
type getSourcesResponse struct {
|
||||
Sources []sourceResponse `json:"sources"`
|
||||
}
|
||||
|
@ -240,6 +258,13 @@ func (h *Service) UpdateSource(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
dbType, err := h.tsdbType(ctx, &src)
|
||||
if err != nil {
|
||||
Error(w, http.StatusBadRequest, "Error contacting source", h.Logger)
|
||||
return
|
||||
}
|
||||
src.Type = dbType
|
||||
|
||||
if err := h.SourcesStore.Update(ctx, src); err != nil {
|
||||
msg := fmt.Sprintf("Error updating source ID %d", id)
|
||||
Error(w, http.StatusInternalServerError, msg, h.Logger)
|
||||
|
@ -256,7 +281,7 @@ func ValidSourceRequest(s chronograf.Source) error {
|
|||
}
|
||||
// Type must be influx or influx-enterprise
|
||||
if s.Type != "" {
|
||||
if s.Type != "influx" && s.Type != "influx-enterprise" {
|
||||
if s.Type != chronograf.InfluxDB && s.Type != chronograf.InfluxEnterprise && s.Type != chronograf.InfluxRelay {
|
||||
return fmt.Errorf("invalid source type %s", s.Type)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2400,9 +2400,11 @@
|
|||
"type": {
|
||||
"type": "string",
|
||||
"description": "Format of the data source",
|
||||
"readOnly": true,
|
||||
"enum": [
|
||||
"influx",
|
||||
"influx-enterprise"
|
||||
"influx-enterprise",
|
||||
"influx-relay"
|
||||
]
|
||||
},
|
||||
"username": {
|
||||
|
|
Loading…
Reference in New Issue