Refactor proxy into influx
parent
26b10dabd7
commit
67746b86c5
|
@ -0,0 +1,70 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/influxdata/chronograf"
|
||||
)
|
||||
|
||||
// ValidInfluxRequest checks if queries specify a command.
|
||||
func ValidInfluxRequest(p chronograf.Query) error {
|
||||
if p.Command == "" {
|
||||
return fmt.Errorf("query field required")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type postInfluxResponse struct {
|
||||
Results interface{} `json:"results"` // results from influx
|
||||
}
|
||||
|
||||
// Influx proxies requests to infludb.
|
||||
func (h *Service) Influx(w http.ResponseWriter, r *http.Request) {
|
||||
id, err := paramID("id", r)
|
||||
if err != nil {
|
||||
Error(w, http.StatusUnprocessableEntity, err.Error(), h.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
var req chronograf.Query
|
||||
if err = json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
invalidJSON(w, h.Logger)
|
||||
return
|
||||
}
|
||||
if err = ValidInfluxRequest(req); err != nil {
|
||||
invalidData(w, err, h.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
src, err := h.SourcesStore.Get(ctx, id)
|
||||
if err != nil {
|
||||
notFound(w, id, h.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
if err = h.TimeSeries.Connect(ctx, &src); err != nil {
|
||||
msg := fmt.Sprintf("Unable to connect to source %d", id)
|
||||
Error(w, http.StatusBadRequest, msg, h.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
response, err := h.TimeSeries.Query(ctx, req)
|
||||
if err != nil {
|
||||
if err == chronograf.ErrUpstreamTimeout {
|
||||
msg := "Timeout waiting for Influx response"
|
||||
Error(w, http.StatusRequestTimeout, msg, h.Logger)
|
||||
return
|
||||
}
|
||||
// TODO: Here I want to return the error code from influx.
|
||||
Error(w, http.StatusBadRequest, err.Error(), h.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
res := postInfluxResponse{
|
||||
Results: response,
|
||||
}
|
||||
encodeJSON(w, http.StatusOK, res, h.Logger)
|
||||
}
|
|
@ -66,8 +66,8 @@ func NewMux(opts MuxOpts, service Service) http.Handler {
|
|||
router.PATCH("/chronograf/v1/sources/:id", service.UpdateSource)
|
||||
router.DELETE("/chronograf/v1/sources/:id", service.RemoveSource)
|
||||
|
||||
// Source Proxy
|
||||
router.POST("/chronograf/v1/sources/:id/proxy", service.Proxy)
|
||||
// Source Proxy to Influx
|
||||
router.POST("/chronograf/v1/sources/:id/proxy", service.Influx)
|
||||
|
||||
// Kapacitor
|
||||
router.GET("/chronograf/v1/sources/:id/kapacitors", service.Kapacitors)
|
||||
|
|
|
@ -2,76 +2,12 @@ package server
|
|||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
|
||||
"github.com/influxdata/chronograf"
|
||||
)
|
||||
|
||||
// ValidProxyRequest checks if queries specify a command.
|
||||
func ValidProxyRequest(p chronograf.Query) error {
|
||||
if p.Command == "" {
|
||||
return fmt.Errorf("query field required")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type postProxyResponse struct {
|
||||
Results interface{} `json:"results"` // results from influx
|
||||
}
|
||||
|
||||
// Proxy proxies requests to infludb.
|
||||
func (h *Service) Proxy(w http.ResponseWriter, r *http.Request) {
|
||||
id, err := paramID("id", r)
|
||||
if err != nil {
|
||||
Error(w, http.StatusUnprocessableEntity, err.Error(), h.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
var req chronograf.Query
|
||||
if err = json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
invalidJSON(w, h.Logger)
|
||||
return
|
||||
}
|
||||
if err = ValidProxyRequest(req); err != nil {
|
||||
invalidData(w, err, h.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
src, err := h.SourcesStore.Get(ctx, id)
|
||||
if err != nil {
|
||||
notFound(w, id, h.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
if err = h.TimeSeries.Connect(ctx, &src); err != nil {
|
||||
msg := fmt.Sprintf("Unable to connect to source %d", id)
|
||||
Error(w, http.StatusBadRequest, msg, h.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
response, err := h.TimeSeries.Query(ctx, req)
|
||||
if err != nil {
|
||||
if err == chronograf.ErrUpstreamTimeout {
|
||||
msg := "Timeout waiting for Influx response"
|
||||
Error(w, http.StatusRequestTimeout, msg, h.Logger)
|
||||
return
|
||||
}
|
||||
// TODO: Here I want to return the error code from influx.
|
||||
Error(w, http.StatusBadRequest, err.Error(), h.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
res := postProxyResponse{
|
||||
Results: response,
|
||||
}
|
||||
encodeJSON(w, http.StatusOK, res, h.Logger)
|
||||
}
|
||||
|
||||
// KapacitorProxy proxies requests to kapacitor using the path query parameter.
|
||||
func (h *Service) KapacitorProxy(w http.ResponseWriter, r *http.Request) {
|
||||
srcID, err := paramID("id", r)
|
||||
|
|
Loading…
Reference in New Issue