Add line-protocol /write endpoint to all data sources
parent
5eac836164
commit
86575b2cde
|
@ -4,6 +4,8 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
|
||||
"github.com/influxdata/chronograf"
|
||||
)
|
||||
|
@ -20,7 +22,7 @@ type postInfluxResponse struct {
|
|||
Results interface{} `json:"results"` // results from influx
|
||||
}
|
||||
|
||||
// Influx proxies requests to infludb.
|
||||
// Influx proxies requests to influxdb.
|
||||
func (h *Service) Influx(w http.ResponseWriter, r *http.Request) {
|
||||
id, err := paramID("id", r)
|
||||
if err != nil {
|
||||
|
@ -75,3 +77,42 @@ func (h *Service) Influx(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
encodeJSON(w, http.StatusOK, res, h.Logger)
|
||||
}
|
||||
|
||||
func (h *Service) Write(w http.ResponseWriter, r *http.Request) {
|
||||
id, err := paramID("id", r)
|
||||
if err != nil {
|
||||
Error(w, http.StatusUnprocessableEntity, err.Error(), h.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
src, err := h.SourcesStore.Get(ctx, id)
|
||||
if err != nil {
|
||||
notFound(w, id, h.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
u, err := url.Parse(src.URL)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("Error parsing source url: %v", err)
|
||||
Error(w, http.StatusUnprocessableEntity, msg, h.Logger)
|
||||
return
|
||||
}
|
||||
u.Path = "/write"
|
||||
u.RawQuery = r.URL.RawQuery
|
||||
|
||||
director := func(req *http.Request) {
|
||||
// Set the Host header of the original source URL
|
||||
req.Host = u.Host
|
||||
req.URL = u
|
||||
// Because we are acting as a proxy, influxdb needs to have the
|
||||
// basic auth information set as a header directly
|
||||
if src.Username != "" && src.Password != "" {
|
||||
req.SetBasicAuth(src.Username, src.Password)
|
||||
}
|
||||
}
|
||||
proxy := &httputil.ReverseProxy{
|
||||
Director: director,
|
||||
}
|
||||
proxy.ServeHTTP(w, r)
|
||||
}
|
||||
|
|
|
@ -81,6 +81,9 @@ func NewMux(opts MuxOpts, service Service) http.Handler {
|
|||
influx := gziphandler.GzipHandler(http.HandlerFunc(service.Influx))
|
||||
router.Handler("POST", "/chronograf/v1/sources/:id/proxy", influx)
|
||||
|
||||
// Write proxies line protocol write requests to InfluxDB
|
||||
router.POST("/chronograf/v1/sources/:id/write", service.Write)
|
||||
|
||||
// Queries is used to analyze a specific queries
|
||||
router.POST("/chronograf/v1/sources/:id/queries", service.Queries)
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ type sourceLinks struct {
|
|||
Kapacitors string `json:"kapacitors"` // URL for kapacitors endpoint
|
||||
Proxy string `json:"proxy"` // URL for proxy endpoint
|
||||
Queries string `json:"queries"` // URL for the queries analysis endpoint
|
||||
Write string `json:"write"` // URL for the write line-protocol endpoint
|
||||
Permissions string `json:"permissions"` // URL for all allowed permissions for this source
|
||||
Users string `json:"users"` // URL for all users associated with this source
|
||||
Roles string `json:"roles,omitempty"` // URL for all users associated with this source
|
||||
|
@ -44,6 +45,7 @@ func newSourceResponse(src chronograf.Source) sourceResponse {
|
|||
Kapacitors: fmt.Sprintf("%s/%d/kapacitors", httpAPISrcs, src.ID),
|
||||
Proxy: fmt.Sprintf("%s/%d/proxy", httpAPISrcs, src.ID),
|
||||
Queries: fmt.Sprintf("%s/%d/queries", httpAPISrcs, src.ID),
|
||||
Write: fmt.Sprintf("%s/%d/write", httpAPISrcs, src.ID),
|
||||
Permissions: fmt.Sprintf("%s/%d/permissions", httpAPISrcs, src.ID),
|
||||
Users: fmt.Sprintf("%s/%d/users", httpAPISrcs, src.ID),
|
||||
Databases: fmt.Sprintf("%s/%d/dbs", httpAPISrcs, src.ID),
|
||||
|
|
|
@ -28,6 +28,7 @@ func Test_newSourceResponse(t *testing.T) {
|
|||
Self: "/chronograf/v1/sources/1",
|
||||
Proxy: "/chronograf/v1/sources/1/proxy",
|
||||
Queries: "/chronograf/v1/sources/1/queries",
|
||||
Write: "/chronograf/v1/sources/1/write",
|
||||
Kapacitors: "/chronograf/v1/sources/1/kapacitors",
|
||||
Users: "/chronograf/v1/sources/1/users",
|
||||
Permissions: "/chronograf/v1/sources/1/permissions",
|
||||
|
@ -50,6 +51,7 @@ func Test_newSourceResponse(t *testing.T) {
|
|||
Self: "/chronograf/v1/sources/1",
|
||||
Proxy: "/chronograf/v1/sources/1/proxy",
|
||||
Queries: "/chronograf/v1/sources/1/queries",
|
||||
Write: "/chronograf/v1/sources/1/write",
|
||||
Kapacitors: "/chronograf/v1/sources/1/kapacitors",
|
||||
Users: "/chronograf/v1/sources/1/users",
|
||||
Permissions: "/chronograf/v1/sources/1/permissions",
|
||||
|
|
|
@ -319,6 +319,94 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"/sources/{id}/write": {
|
||||
"post": {
|
||||
"tags": [
|
||||
"sources",
|
||||
"write"
|
||||
],
|
||||
"description": "Write points to the backend time series data source",
|
||||
"parameters": [
|
||||
{
|
||||
"name": "id",
|
||||
"in": "path",
|
||||
"type": "string",
|
||||
"description": "ID of the data source",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"name": "query",
|
||||
"in": "body",
|
||||
"description": "Write Parameters",
|
||||
"schema": {
|
||||
"type": "string",
|
||||
"format": "byte"
|
||||
},
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"name": "db",
|
||||
"in": "query",
|
||||
"description": "Sets the target database for the write.",
|
||||
"type": "string",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"name": "rp",
|
||||
"in": "query",
|
||||
"description": "Sets the target retention policy for the write. InfluxDB writes to the DEFAULT retention policy if you do not specify a retention policy.",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "precision",
|
||||
"in": "query",
|
||||
"description": "Sets the precision for the supplied Unix time values. InfluxDB assumes that timestamps are in nanoseconds if you do not specify precision.",
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"ns","u","ms","s","m","h"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "consistency",
|
||||
"in": "query",
|
||||
"description": "Sets the write consistency for the point. InfluxDB assumes that the write consistency is one if you do not specify consistency. See the InfluxEnterprise documentation for detailed descriptions of each consistency option.",
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"any","one","quorum","all"
|
||||
]
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"204": {
|
||||
"description": "Points written successfuly to database."
|
||||
},
|
||||
"400": {
|
||||
"description": "Any query that results in a data source error (syntax error, etc) will cause this response. The error message will be passed back in the body",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/Error"
|
||||
}
|
||||
},
|
||||
"404": {
|
||||
"description": "Data source id does not exist.",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/Error"
|
||||
}
|
||||
},
|
||||
"408": {
|
||||
"description": "Timeout trying to query data source.",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/Error"
|
||||
}
|
||||
},
|
||||
"default": {
|
||||
"description": "Unexpected internal service error",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/Error"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/sources/{id}/permissions": {
|
||||
"get": {
|
||||
"tags": [
|
||||
|
@ -2969,6 +3057,7 @@
|
|||
"self": "/chronograf/v1/sources/4",
|
||||
"kapacitors": "/chronograf/v1/sources/4/kapacitors",
|
||||
"proxy": "/chronograf/v1/sources/4/proxy",
|
||||
"write": "/chronograf/v1/sources/4/write",
|
||||
"queries": "/chronograf/v1/sources/4/queries",
|
||||
"permissions": "/chronograf/v1/sources/4/permissions",
|
||||
"users": "/chronograf/v1/sources/4/users",
|
||||
|
@ -3042,6 +3131,11 @@
|
|||
"description": "URL location of proxy endpoint for this source",
|
||||
"format": "url"
|
||||
},
|
||||
"write": {
|
||||
"type": "string",
|
||||
"description": "URL location of write endpoint for this source",
|
||||
"format": "url"
|
||||
},
|
||||
"queries": {
|
||||
"type": "string",
|
||||
"description": "URL location of the queries endpoint for this source",
|
||||
|
|
Loading…
Reference in New Issue