364 lines
9.3 KiB
Go
364 lines
9.3 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
|
|
"github.com/influxdata/chronograf"
|
|
"github.com/influxdata/chronograf/influx"
|
|
)
|
|
|
|
type sourceLinks struct {
|
|
Self string `json:"self"` // Self link mapping to this resource
|
|
Kapacitors string `json:"kapacitors"` // URL for kapacitors endpoint
|
|
Proxy string `json:"proxy"` // URL for proxy endpoint
|
|
Queries string `json:"queries"` // URL for the queries analysis endpoint
|
|
Write string `json:"write"` // URL for the write line-protocol endpoint
|
|
Permissions string `json:"permissions"` // URL for all allowed permissions for this source
|
|
Users string `json:"users"` // URL for all users associated with this source
|
|
Roles string `json:"roles,omitempty"` // URL for all users associated with this source
|
|
Databases string `json:"databases"` // URL for the databases contained within this soure
|
|
}
|
|
|
|
type sourceResponse struct {
|
|
chronograf.Source
|
|
Links sourceLinks `json:"links"`
|
|
}
|
|
|
|
func newSourceResponse(src chronograf.Source) sourceResponse {
|
|
// If telegraf is not set, we'll set it to the default value.
|
|
if src.Telegraf == "" {
|
|
src.Telegraf = "telegraf"
|
|
}
|
|
|
|
// Omit the password and shared secret on response
|
|
src.Password = ""
|
|
src.SharedSecret = ""
|
|
|
|
httpAPISrcs := "/chronograf/v1/sources"
|
|
res := sourceResponse{
|
|
Source: src,
|
|
Links: sourceLinks{
|
|
Self: fmt.Sprintf("%s/%d", httpAPISrcs, src.ID),
|
|
Kapacitors: fmt.Sprintf("%s/%d/kapacitors", httpAPISrcs, src.ID),
|
|
Proxy: fmt.Sprintf("%s/%d/proxy", httpAPISrcs, src.ID),
|
|
Queries: fmt.Sprintf("%s/%d/queries", httpAPISrcs, src.ID),
|
|
Write: fmt.Sprintf("%s/%d/write", httpAPISrcs, src.ID),
|
|
Permissions: fmt.Sprintf("%s/%d/permissions", httpAPISrcs, src.ID),
|
|
Users: fmt.Sprintf("%s/%d/users", httpAPISrcs, src.ID),
|
|
Databases: fmt.Sprintf("%s/%d/dbs", httpAPISrcs, src.ID),
|
|
},
|
|
}
|
|
|
|
if src.Type == chronograf.InfluxEnterprise {
|
|
res.Links.Roles = fmt.Sprintf("%s/%d/roles", httpAPISrcs, src.ID)
|
|
}
|
|
return res
|
|
}
|
|
|
|
// NewSource adds a new valid source to the store
|
|
func (h *Service) NewSource(w http.ResponseWriter, r *http.Request) {
|
|
var src chronograf.Source
|
|
if err := json.NewDecoder(r.Body).Decode(&src); err != nil {
|
|
invalidJSON(w, h.Logger)
|
|
return
|
|
}
|
|
|
|
if err := ValidSourceRequest(src); err != nil {
|
|
invalidData(w, err, h.Logger)
|
|
return
|
|
}
|
|
|
|
// By default the telegraf database will be telegraf
|
|
if src.Telegraf == "" {
|
|
src.Telegraf = "telegraf"
|
|
}
|
|
|
|
ctx := r.Context()
|
|
dbType, err := h.tsdbType(ctx, &src)
|
|
if err != nil {
|
|
Error(w, http.StatusBadRequest, "Error contacting source", h.Logger)
|
|
return
|
|
}
|
|
|
|
src.Type = dbType
|
|
if src, err = h.SourcesStore.Add(ctx, src); err != nil {
|
|
msg := fmt.Errorf("Error storing source %v: %v", src, err)
|
|
unknownErrorWithMessage(w, msg, h.Logger)
|
|
return
|
|
}
|
|
|
|
res := newSourceResponse(src)
|
|
w.Header().Add("Location", res.Links.Self)
|
|
encodeJSON(w, http.StatusCreated, res, h.Logger)
|
|
}
|
|
|
|
func (h *Service) tsdbType(ctx context.Context, src *chronograf.Source) (string, error) {
|
|
cli := &influx.Client{
|
|
Logger: h.Logger,
|
|
}
|
|
|
|
if err := cli.Connect(ctx, src); err != nil {
|
|
return "", err
|
|
}
|
|
return cli.Type(ctx)
|
|
}
|
|
|
|
type getSourcesResponse struct {
|
|
Sources []sourceResponse `json:"sources"`
|
|
}
|
|
|
|
// Sources returns all sources from the store.
|
|
func (h *Service) Sources(w http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
srcs, err := h.SourcesStore.All(ctx)
|
|
if err != nil {
|
|
Error(w, http.StatusInternalServerError, "Error loading sources", h.Logger)
|
|
return
|
|
}
|
|
|
|
res := getSourcesResponse{
|
|
Sources: make([]sourceResponse, len(srcs)),
|
|
}
|
|
|
|
for i, src := range srcs {
|
|
res.Sources[i] = newSourceResponse(src)
|
|
}
|
|
|
|
encodeJSON(w, http.StatusOK, res, h.Logger)
|
|
}
|
|
|
|
// SourcesID retrieves a source from the store
|
|
func (h *Service) SourcesID(w http.ResponseWriter, r *http.Request) {
|
|
id, err := paramID("id", r)
|
|
if err != nil {
|
|
Error(w, http.StatusUnprocessableEntity, err.Error(), h.Logger)
|
|
return
|
|
}
|
|
|
|
ctx := r.Context()
|
|
src, err := h.SourcesStore.Get(ctx, id)
|
|
if err != nil {
|
|
notFound(w, id, h.Logger)
|
|
return
|
|
}
|
|
|
|
res := newSourceResponse(src)
|
|
encodeJSON(w, http.StatusOK, res, h.Logger)
|
|
}
|
|
|
|
// RemoveSource deletes the source from the store
|
|
func (h *Service) RemoveSource(w http.ResponseWriter, r *http.Request) {
|
|
id, err := paramID("id", r)
|
|
if err != nil {
|
|
Error(w, http.StatusUnprocessableEntity, err.Error(), h.Logger)
|
|
return
|
|
}
|
|
|
|
src := chronograf.Source{ID: id}
|
|
ctx := r.Context()
|
|
if err = h.SourcesStore.Delete(ctx, src); err != nil {
|
|
if err == chronograf.ErrSourceNotFound {
|
|
notFound(w, id, h.Logger)
|
|
} else {
|
|
unknownErrorWithMessage(w, err, h.Logger)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Remove all the associated kapacitors for this source
|
|
if err = h.removeSrcsKapa(ctx, id); err != nil {
|
|
unknownErrorWithMessage(w, err, h.Logger)
|
|
return
|
|
}
|
|
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|
|
|
|
// removeSrcsKapa will remove all kapacitors and kapacitor rules from the stores.
|
|
// However, it will not remove the kapacitor tickscript from kapacitor itself.
|
|
func (h *Service) removeSrcsKapa(ctx context.Context, srcID int) error {
|
|
kapas, err := h.ServersStore.All(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Filter the kapacitors to delete by matching the source id
|
|
deleteKapa := []int{}
|
|
for _, kapa := range kapas {
|
|
if kapa.SrcID == srcID {
|
|
deleteKapa = append(deleteKapa, kapa.ID)
|
|
}
|
|
}
|
|
|
|
for _, kapaID := range deleteKapa {
|
|
kapa := chronograf.Server{
|
|
ID: kapaID,
|
|
}
|
|
h.Logger.Debug("Deleting kapacitor resource id ", kapa.ID)
|
|
|
|
if err := h.ServersStore.Delete(ctx, kapa); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateSource handles incremental updates of a data source
|
|
func (h *Service) UpdateSource(w http.ResponseWriter, r *http.Request) {
|
|
id, err := paramID("id", r)
|
|
if err != nil {
|
|
Error(w, http.StatusUnprocessableEntity, err.Error(), h.Logger)
|
|
return
|
|
}
|
|
|
|
ctx := r.Context()
|
|
src, err := h.SourcesStore.Get(ctx, id)
|
|
if err != nil {
|
|
notFound(w, id, h.Logger)
|
|
return
|
|
}
|
|
|
|
var req chronograf.Source
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
invalidJSON(w, h.Logger)
|
|
return
|
|
}
|
|
|
|
src.Default = req.Default
|
|
src.InsecureSkipVerify = req.InsecureSkipVerify
|
|
if req.Name != "" {
|
|
src.Name = req.Name
|
|
}
|
|
if req.Password != "" {
|
|
src.Password = req.Password
|
|
}
|
|
if req.Username != "" {
|
|
src.Username = req.Username
|
|
}
|
|
if req.URL != "" {
|
|
src.URL = req.URL
|
|
}
|
|
if req.MetaURL != "" {
|
|
src.MetaURL = req.MetaURL
|
|
}
|
|
if req.Type != "" {
|
|
src.Type = req.Type
|
|
}
|
|
if req.Telegraf != "" {
|
|
src.Telegraf = req.Telegraf
|
|
}
|
|
|
|
if err := ValidSourceRequest(src); err != nil {
|
|
invalidData(w, err, h.Logger)
|
|
return
|
|
}
|
|
|
|
dbType, err := h.tsdbType(ctx, &src)
|
|
if err != nil {
|
|
Error(w, http.StatusBadRequest, "Error contacting source", h.Logger)
|
|
return
|
|
}
|
|
src.Type = dbType
|
|
|
|
if err := h.SourcesStore.Update(ctx, src); err != nil {
|
|
msg := fmt.Sprintf("Error updating source ID %d", id)
|
|
Error(w, http.StatusInternalServerError, msg, h.Logger)
|
|
return
|
|
}
|
|
encodeJSON(w, http.StatusOK, newSourceResponse(src), h.Logger)
|
|
}
|
|
|
|
// ValidSourceRequest checks if name, url and type are valid
|
|
func ValidSourceRequest(s chronograf.Source) error {
|
|
// Name and URL areq required
|
|
if s.URL == "" {
|
|
return fmt.Errorf("url required")
|
|
}
|
|
// Type must be influx or influx-enterprise
|
|
if s.Type != "" {
|
|
if s.Type != chronograf.InfluxDB && s.Type != chronograf.InfluxEnterprise && s.Type != chronograf.InfluxRelay {
|
|
return fmt.Errorf("invalid source type %s", s.Type)
|
|
}
|
|
}
|
|
|
|
url, err := url.ParseRequestURI(s.URL)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid source URI: %v", err)
|
|
}
|
|
if len(url.Scheme) == 0 {
|
|
return fmt.Errorf("Invalid URL; no URL scheme defined")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// HandleNewSources parses and persists new sources passed in via server flag
|
|
func (h *Service) HandleNewSources(ctx context.Context, input string) error {
|
|
if input == "" {
|
|
return nil
|
|
}
|
|
|
|
var srcsKaps []struct {
|
|
Source chronograf.Source `json:"influxdb"`
|
|
Kapacitor chronograf.Server `json:"kapacitor"`
|
|
}
|
|
if err := json.Unmarshal([]byte(input), &srcsKaps); err != nil {
|
|
h.Logger.
|
|
WithField("component", "server").
|
|
WithField("NewSources", "invalid").
|
|
Error(err)
|
|
return err
|
|
}
|
|
|
|
for _, sk := range srcsKaps {
|
|
if err := ValidSourceRequest(sk.Source); err != nil {
|
|
return err
|
|
}
|
|
// Add any new sources and kapacitors as specified via server flag
|
|
if err := h.newSourceKapacitor(ctx, sk.Source, sk.Kapacitor); err != nil {
|
|
// Continue with server run even if adding NewSource fails
|
|
h.Logger.
|
|
WithField("component", "server").
|
|
WithField("NewSource", "invalid").
|
|
Error(err)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// newSourceKapacitor adds sources to BoltDB idempotently by name, as well as respective kapacitors
|
|
func (h *Service) newSourceKapacitor(ctx context.Context, src chronograf.Source, kapa chronograf.Server) error {
|
|
srcs, err := h.SourcesStore.All(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, s := range srcs {
|
|
// If source already exists, do nothing
|
|
if s.Name == src.Name {
|
|
h.Logger.
|
|
WithField("component", "server").
|
|
WithField("NewSource", s.Name).
|
|
Info("Source already exists")
|
|
return nil
|
|
}
|
|
}
|
|
|
|
src, err = h.SourcesStore.Add(ctx, src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
kapa.SrcID = src.ID
|
|
if _, err := h.ServersStore.Add(ctx, kapa); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|