From a25a3623606ca60b4edb81fa44138b306464aae0 Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Mon, 13 Mar 2017 16:45:05 -0500 Subject: [PATCH] Add ping and detection of influxdb source types --- chronograf.go | 22 ++++++++++++ influx/influx.go | 82 +++++++++++++++++++++++++++++++++++++++++++++ server/service.go | 2 +- server/sources.go | 33 +++++++++++++++--- server/swagger.json | 4 ++- 5 files changed, 137 insertions(+), 6 deletions(-) diff --git a/chronograf.go b/chronograf.go index fc624bea4f..7677da7ef7 100644 --- a/chronograf.go +++ b/chronograf.go @@ -47,6 +47,28 @@ type Assets interface { Handler() http.Handler } +// Supported time-series databases +const ( + // InfluxDB is the open-source time-series database + InfluxDB = "influx" + // InfluxEnteprise is the clustered HA time-series database + InfluxEnterprise = "influx-enterprise" + // InfluxRelay is the basic HA layer over InfluxDB + InfluxRelay = "influx-relay" +) + +// TSDBStatus represents the current status of a time series database +type TSDBStatus interface { + // Connect will connect to the time series using the information in `Source`. + Connect(ctx context.Context, src *Source) error + // Ping returns version and TSDB type of time series database if reachable. + Ping(context.Context) error + // Version returns the version of the TSDB database + Version(context.Context) (string, error) + // Type returns the type of the TSDB database + Type(context.Context) (string, error) +} + // TimeSeries represents a queryable time series database. type TimeSeries interface { // Query retrieves time series data from the database. diff --git a/influx/influx.go b/influx/influx.go index 02d8eebf1a..4f08efbf5c 100644 --- a/influx/influx.go +++ b/influx/influx.go @@ -5,13 +5,16 @@ import ( "crypto/tls" "encoding/json" "fmt" + "io/ioutil" "net/http" "net/url" + "strings" "github.com/influxdata/chronograf" ) var _ chronograf.TimeSeries = &Client{} +var _ chronograf.TSDBStatus = &Client{} // Shared transports for all clients to prevent leaking connections var ( @@ -184,3 +187,82 @@ func (c *Client) Users(ctx context.Context) chronograf.UsersStore { func (c *Client) Roles(ctx context.Context) (chronograf.RolesStore, error) { return nil, fmt.Errorf("Roles not support in open-source InfluxDB. Roles are support in Influx Enterprise") } + +// Ping hits the influxdb ping endpoint and returns the type of influx +func (c *Client) Ping(ctx context.Context) error { + _, _, err := c.pingTimeout(ctx) + return err +} + +// Version hits the influxdb ping endpoint and returns the version of influx +func (c *Client) Version(ctx context.Context) (string, error) { + version, _, err := c.pingTimeout(ctx) + return version, err +} + +// Type hits the influxdb ping endpoint and returns the type of influx running +func (c *Client) Type(ctx context.Context) (string, error) { + _, tsdbType, err := c.pingTimeout(ctx) + return tsdbType, err +} + +func (c *Client) pingTimeout(ctx context.Context) (string, string, error) { + resps := make(chan (pingResult)) + go func() { + version, tsdbType, err := c.ping(c.URL) + resps <- pingResult{version, tsdbType, err} + }() + + select { + case resp := <-resps: + return resp.Version, resp.Type, resp.Err + case <-ctx.Done(): + return "", "", chronograf.ErrUpstreamTimeout + } +} + +type pingResult struct { + Version string + Type string + Err error +} + +func (c *Client) ping(u *url.URL) (string, string, error) { + u.Path = "ping" + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return "", "", err + } + + hc := &http.Client{} + if c.InsecureSkipVerify { + hc.Transport = skipVerifyTransport + } else { + hc.Transport = defaultTransport + } + + resp, err := hc.Do(req) + if err != nil { + return "", "", err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", "", err + } + + if resp.StatusCode != http.StatusNoContent { + var err = fmt.Errorf(string(body)) + return "", "", err + } + + version := resp.Header.Get("X-Influxdb-Version") + if strings.Contains(version, "-c") { + return version, chronograf.InfluxEnterprise, nil + } else if strings.Contains(version, "relay") { + return version, chronograf.InfluxRelay, nil + } + return version, chronograf.InfluxDB, nil +} diff --git a/server/service.go b/server/service.go index 8dfa115a4d..6f44b9626f 100644 --- a/server/service.go +++ b/server/service.go @@ -43,7 +43,7 @@ type InfluxClient struct{} // New creates a client to connect to OSS or enterprise func (c *InfluxClient) New(src chronograf.Source, logger chronograf.Logger) (chronograf.TimeSeries, error) { - if src.Type == "influx-enterprise" && src.MetaURL != "" { + if src.Type == chronograf.InfluxEnterprise && src.MetaURL != "" { dataNode := &influx.Client{ Logger: logger, } diff --git a/server/sources.go b/server/sources.go index c53c8f1df3..ef43706118 100644 --- a/server/sources.go +++ b/server/sources.go @@ -8,6 +8,7 @@ import ( "net/url" "github.com/influxdata/chronograf" + "github.com/influxdata/chronograf/influx" ) type sourceLinks struct { @@ -45,7 +46,7 @@ func newSourceResponse(src chronograf.Source) sourceResponse { }, } - if src.Type == "influx-enterprise" { + if src.Type == chronograf.InfluxEnterprise { res.Links.Roles = fmt.Sprintf("%s/%d/roles", httpAPISrcs, src.ID) } return res @@ -69,8 +70,15 @@ func (h *Service) NewSource(w http.ResponseWriter, r *http.Request) { src.Telegraf = "telegraf" } - var err error - if src, err = h.SourcesStore.Add(r.Context(), src); err != nil { + ctx := r.Context() + dbType, err := h.tsdbType(ctx, &src) + if err != nil { + Error(w, http.StatusBadRequest, "Error contacting source", h.Logger) + return + } + + src.Type = dbType + if src, err = h.SourcesStore.Add(ctx, src); err != nil { msg := fmt.Errorf("Error storing source %v: %v", src, err) unknownErrorWithMessage(w, msg, h.Logger) return @@ -81,6 +89,16 @@ func (h *Service) NewSource(w http.ResponseWriter, r *http.Request) { encodeJSON(w, http.StatusCreated, res, h.Logger) } +func (h *Service) tsdbType(ctx context.Context, src *chronograf.Source) (string, error) { + cli := &influx.Client{ + Logger: h.Logger, + } + if err := cli.Connect(ctx, src); err != nil { + return "", err + } + return cli.Type(ctx) +} + type getSourcesResponse struct { Sources []sourceResponse `json:"sources"` } @@ -240,6 +258,13 @@ func (h *Service) UpdateSource(w http.ResponseWriter, r *http.Request) { return } + dbType, err := h.tsdbType(ctx, &src) + if err != nil { + Error(w, http.StatusBadRequest, "Error contacting source", h.Logger) + return + } + src.Type = dbType + if err := h.SourcesStore.Update(ctx, src); err != nil { msg := fmt.Sprintf("Error updating source ID %d", id) Error(w, http.StatusInternalServerError, msg, h.Logger) @@ -256,7 +281,7 @@ func ValidSourceRequest(s chronograf.Source) error { } // Type must be influx or influx-enterprise if s.Type != "" { - if s.Type != "influx" && s.Type != "influx-enterprise" { + if s.Type != chronograf.InfluxDB && s.Type != chronograf.InfluxEnterprise && s.Type != chronograf.InfluxRelay { return fmt.Errorf("invalid source type %s", s.Type) } } diff --git a/server/swagger.json b/server/swagger.json index 4b67b131cb..0e6d73a947 100644 --- a/server/swagger.json +++ b/server/swagger.json @@ -2400,9 +2400,11 @@ "type": { "type": "string", "description": "Format of the data source", + "readOnly": true, "enum": [ "influx", - "influx-enterprise" + "influx-enterprise", + "influx-relay" ] }, "username": {