From 67746b86c56f753bc5637b50ced0a61b4c0dfa88 Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Fri, 17 Feb 2017 14:02:02 -0600 Subject: [PATCH] Refactor proxy into influx --- server/influx.go | 70 ++++++++++++++++++++++++++++++++++++++++++++++++ server/mux.go | 4 +-- server/proxy.go | 64 ------------------------------------------- 3 files changed, 72 insertions(+), 66 deletions(-) create mode 100644 server/influx.go diff --git a/server/influx.go b/server/influx.go new file mode 100644 index 000000000..d61b4a2aa --- /dev/null +++ b/server/influx.go @@ -0,0 +1,70 @@ +package server + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/influxdata/chronograf" +) + +// ValidInfluxRequest checks if queries specify a command. +func ValidInfluxRequest(p chronograf.Query) error { + if p.Command == "" { + return fmt.Errorf("query field required") + } + return nil +} + +type postInfluxResponse struct { + Results interface{} `json:"results"` // results from influx +} + +// Influx proxies requests to infludb. +func (h *Service) Influx(w http.ResponseWriter, r *http.Request) { + id, err := paramID("id", r) + if err != nil { + Error(w, http.StatusUnprocessableEntity, err.Error(), h.Logger) + return + } + + var req chronograf.Query + if err = json.NewDecoder(r.Body).Decode(&req); err != nil { + invalidJSON(w, h.Logger) + return + } + if err = ValidInfluxRequest(req); err != nil { + invalidData(w, err, h.Logger) + return + } + + ctx := r.Context() + src, err := h.SourcesStore.Get(ctx, id) + if err != nil { + notFound(w, id, h.Logger) + return + } + + if err = h.TimeSeries.Connect(ctx, &src); err != nil { + msg := fmt.Sprintf("Unable to connect to source %d", id) + Error(w, http.StatusBadRequest, msg, h.Logger) + return + } + + response, err := h.TimeSeries.Query(ctx, req) + if err != nil { + if err == chronograf.ErrUpstreamTimeout { + msg := "Timeout waiting for Influx response" + Error(w, http.StatusRequestTimeout, msg, h.Logger) + return + } + // TODO: Here I want to return the error code from influx. + Error(w, http.StatusBadRequest, err.Error(), h.Logger) + return + } + + res := postInfluxResponse{ + Results: response, + } + encodeJSON(w, http.StatusOK, res, h.Logger) +} diff --git a/server/mux.go b/server/mux.go index ecc005723..bd7fab17f 100644 --- a/server/mux.go +++ b/server/mux.go @@ -66,8 +66,8 @@ func NewMux(opts MuxOpts, service Service) http.Handler { router.PATCH("/chronograf/v1/sources/:id", service.UpdateSource) router.DELETE("/chronograf/v1/sources/:id", service.RemoveSource) - // Source Proxy - router.POST("/chronograf/v1/sources/:id/proxy", service.Proxy) + // Source Proxy to Influx + router.POST("/chronograf/v1/sources/:id/proxy", service.Influx) // Kapacitor router.GET("/chronograf/v1/sources/:id/kapacitors", service.Kapacitors) diff --git a/server/proxy.go b/server/proxy.go index 10537db63..cf1cd155d 100644 --- a/server/proxy.go +++ b/server/proxy.go @@ -2,76 +2,12 @@ package server import ( "encoding/base64" - "encoding/json" "fmt" "net/http" "net/http/httputil" "net/url" - - "github.com/influxdata/chronograf" ) -// ValidProxyRequest checks if queries specify a command. -func ValidProxyRequest(p chronograf.Query) error { - if p.Command == "" { - return fmt.Errorf("query field required") - } - return nil -} - -type postProxyResponse struct { - Results interface{} `json:"results"` // results from influx -} - -// Proxy proxies requests to infludb. -func (h *Service) Proxy(w http.ResponseWriter, r *http.Request) { - id, err := paramID("id", r) - if err != nil { - Error(w, http.StatusUnprocessableEntity, err.Error(), h.Logger) - return - } - - var req chronograf.Query - if err = json.NewDecoder(r.Body).Decode(&req); err != nil { - invalidJSON(w, h.Logger) - return - } - if err = ValidProxyRequest(req); err != nil { - invalidData(w, err, h.Logger) - return - } - - ctx := r.Context() - src, err := h.SourcesStore.Get(ctx, id) - if err != nil { - notFound(w, id, h.Logger) - return - } - - if err = h.TimeSeries.Connect(ctx, &src); err != nil { - msg := fmt.Sprintf("Unable to connect to source %d", id) - Error(w, http.StatusBadRequest, msg, h.Logger) - return - } - - response, err := h.TimeSeries.Query(ctx, req) - if err != nil { - if err == chronograf.ErrUpstreamTimeout { - msg := "Timeout waiting for Influx response" - Error(w, http.StatusRequestTimeout, msg, h.Logger) - return - } - // TODO: Here I want to return the error code from influx. - Error(w, http.StatusBadRequest, err.Error(), h.Logger) - return - } - - res := postProxyResponse{ - Results: response, - } - encodeJSON(w, http.StatusOK, res, h.Logger) -} - // KapacitorProxy proxies requests to kapacitor using the path query parameter. func (h *Service) KapacitorProxy(w http.ResponseWriter, r *http.Request) { srcID, err := paramID("id", r)