influxdb/server/sources.go

364 lines
9.3 KiB
Go
Raw Normal View History

2016-10-25 15:20:06 +00:00
package server
import (
"context"
2016-10-25 15:20:06 +00:00
"encoding/json"
"fmt"
"net/http"
"net/url"
"github.com/influxdata/chronograf"
"github.com/influxdata/chronograf/influx"
2016-10-25 15:20:06 +00:00
)
type sourceLinks struct {
2017-02-23 22:02:53 +00:00
Self string `json:"self"` // Self link mapping to this resource
Kapacitors string `json:"kapacitors"` // URL for kapacitors endpoint
Proxy string `json:"proxy"` // URL for proxy endpoint
Queries string `json:"queries"` // URL for the queries analysis endpoint
Write string `json:"write"` // URL for the write line-protocol endpoint
2017-02-23 22:02:53 +00:00
Permissions string `json:"permissions"` // URL for all allowed permissions for this source
Users string `json:"users"` // URL for all users associated with this source
Roles string `json:"roles,omitempty"` // URL for all users associated with this source
Databases string `json:"databases"` // URL for the databases contained within this soure
2016-10-25 15:20:06 +00:00
}
type sourceResponse struct {
chronograf.Source
Links sourceLinks `json:"links"`
}
func newSourceResponse(src chronograf.Source) sourceResponse {
// If telegraf is not set, we'll set it to the default value.
if src.Telegraf == "" {
src.Telegraf = "telegraf"
}
// Omit the password and shared secret on response
2017-02-12 22:38:05 +00:00
src.Password = ""
src.SharedSecret = ""
2017-02-12 22:38:05 +00:00
httpAPISrcs := "/chronograf/v1/sources"
2017-02-23 22:02:53 +00:00
res := sourceResponse{
2016-10-25 15:20:06 +00:00
Source: src,
Links: sourceLinks{
Self: fmt.Sprintf("%s/%d", httpAPISrcs, src.ID),
Kapacitors: fmt.Sprintf("%s/%d/kapacitors", httpAPISrcs, src.ID),
Proxy: fmt.Sprintf("%s/%d/proxy", httpAPISrcs, src.ID),
Queries: fmt.Sprintf("%s/%d/queries", httpAPISrcs, src.ID),
Write: fmt.Sprintf("%s/%d/write", httpAPISrcs, src.ID),
Permissions: fmt.Sprintf("%s/%d/permissions", httpAPISrcs, src.ID),
Users: fmt.Sprintf("%s/%d/users", httpAPISrcs, src.ID),
Databases: fmt.Sprintf("%s/%d/dbs", httpAPISrcs, src.ID),
2016-10-25 15:20:06 +00:00
},
}
2017-02-23 22:02:53 +00:00
if src.Type == chronograf.InfluxEnterprise && len(src.MetaURL) != 0 {
2017-02-23 22:02:53 +00:00
res.Links.Roles = fmt.Sprintf("%s/%d/roles", httpAPISrcs, src.ID)
}
return res
2016-10-25 15:20:06 +00:00
}
// NewSource adds a new valid source to the store
func (h *Service) NewSource(w http.ResponseWriter, r *http.Request) {
2016-10-25 15:20:06 +00:00
var src chronograf.Source
if err := json.NewDecoder(r.Body).Decode(&src); err != nil {
2016-11-19 17:41:06 +00:00
invalidJSON(w, h.Logger)
2016-10-25 15:20:06 +00:00
return
}
2016-10-25 15:20:06 +00:00
if err := ValidSourceRequest(src); err != nil {
2016-11-19 17:41:06 +00:00
invalidData(w, err, h.Logger)
2016-10-25 15:20:06 +00:00
return
}
// By default the telegraf database will be telegraf
if src.Telegraf == "" {
src.Telegraf = "telegraf"
}
ctx := r.Context()
dbType, err := h.tsdbType(ctx, &src)
if err != nil {
Error(w, http.StatusBadRequest, "Error contacting source", h.Logger)
return
}
src.Type = dbType
if src, err = h.SourcesStore.Add(ctx, src); err != nil {
2016-10-25 15:20:06 +00:00
msg := fmt.Errorf("Error storing source %v: %v", src, err)
2016-11-19 17:41:06 +00:00
unknownErrorWithMessage(w, msg, h.Logger)
2016-10-25 15:20:06 +00:00
return
}
res := newSourceResponse(src)
w.Header().Add("Location", res.Links.Self)
encodeJSON(w, http.StatusCreated, res, h.Logger)
}
func (h *Service) tsdbType(ctx context.Context, src *chronograf.Source) (string, error) {
cli := &influx.Client{
Logger: h.Logger,
}
if err := cli.Connect(ctx, src); err != nil {
return "", err
}
return cli.Type(ctx)
}
2016-10-25 15:20:06 +00:00
type getSourcesResponse struct {
Sources []sourceResponse `json:"sources"`
}
// Sources returns all sources from the store.
func (h *Service) Sources(w http.ResponseWriter, r *http.Request) {
2016-10-25 15:20:06 +00:00
ctx := r.Context()
srcs, err := h.SourcesStore.All(ctx)
if err != nil {
2016-11-19 17:41:06 +00:00
Error(w, http.StatusInternalServerError, "Error loading sources", h.Logger)
2016-10-25 15:20:06 +00:00
return
}
res := getSourcesResponse{
Sources: make([]sourceResponse, len(srcs)),
}
for i, src := range srcs {
res.Sources[i] = newSourceResponse(src)
}
encodeJSON(w, http.StatusOK, res, h.Logger)
}
// SourcesID retrieves a source from the store
func (h *Service) SourcesID(w http.ResponseWriter, r *http.Request) {
2016-10-25 15:20:06 +00:00
id, err := paramID("id", r)
if err != nil {
2016-11-19 17:41:06 +00:00
Error(w, http.StatusUnprocessableEntity, err.Error(), h.Logger)
2016-10-25 15:20:06 +00:00
return
}
ctx := r.Context()
src, err := h.SourcesStore.Get(ctx, id)
if err != nil {
2016-11-19 17:41:06 +00:00
notFound(w, id, h.Logger)
2016-10-25 15:20:06 +00:00
return
}
res := newSourceResponse(src)
encodeJSON(w, http.StatusOK, res, h.Logger)
}
// RemoveSource deletes the source from the store
func (h *Service) RemoveSource(w http.ResponseWriter, r *http.Request) {
2016-10-25 15:20:06 +00:00
id, err := paramID("id", r)
if err != nil {
2016-11-19 17:41:06 +00:00
Error(w, http.StatusUnprocessableEntity, err.Error(), h.Logger)
2016-10-25 15:20:06 +00:00
return
}
src := chronograf.Source{ID: id}
ctx := r.Context()
if err = h.SourcesStore.Delete(ctx, src); err != nil {
Redirect to default source when deleting sources (#1074) * Redirect to default source on invalid source ID When supplied with an invalid source ID, the CheckSources component would redirect the user to a "Create Source" page. This caused surprising behavior when a source was deleted because that source ID would become invalid. The effect being that deleting a source brought users immediately to the create source page, rather than back to the sources list. This instead redirects users to the default source when provided an invalid source id. The backend automatically re-assigns the "default" source, so this will always succeed, since sources are fetched again from the backend. The regex used is slightly dependent on URL structure that has been stable over the lifetime of this project. Also it relies on URL structure more than the previous redirecting implementation. * Force sources to reload after deletion Deleting a source invalidates the state held by the client because of automatic re-assignment of the default source by the backend. Without duplicating backend logic, it is impossible for the frontend to discover the new source without reloading sources. The ManageSources page now uses an async-action creator which deletes the requested source and reloads all sources. The source action creators have also been refactored to use implicit returns like other action creators. * Remove Dead removeSource action removeSource is no longer used because the API invalidates its assumptions. For more information, see 04bf3ca. * Update Changelog with source deletion redirect fix Users are no longer unexpectedly redirected to the "create source" page whenever they delete a source that they are connected to. * Return 404 when deleting non-existent source When deleting a source, a new default is assigned automatically. If a non-existent source ID was provided, previously this would result in a 500. This is a violation of the Swagger docs. The solution is to examine the error and if it was an ErrSourceNotFound, invoke the notFound handler. * Add Error handling to source deletion There are two kinds of errors that can be encountered when deleting a source: a 404 and a 500 (from either the delete or the subsequent fetch). The 404 is a precondition failure of the action creator. The source.id requested can be non-existent for two reasons: 1) The action creator was passed garbage by the caller. 2) A concurrent write occurred which silently invalidated this session's state. For the first case, we can ensure that the caller is sane by having an assertion check that the requested source is among some set of sources. This could be circumvented by a caller, but chances are good that both the full set of sources and the desired source are both available to callers of this action creator. The second case is not an error. In this case, we should proceed reloading sources, since the deletion that was requested has already been performed by someone else. Finally, 500s can only occur if there is something broken with the API. In this situation, we provide a notification that tells the user to check the API logs for more information. * Remove duplicate CHANGELOG entries These were introduced due to a naive merge conflict resolution. * Remove assertion This was decided to be confusing and unnecessary. * Remove remnants of removed assertion These were needed for an assertion that has been removed. It's no longer necessary to pass `sources` to the action creator.
2017-03-28 15:53:11 +00:00
if err == chronograf.ErrSourceNotFound {
notFound(w, id, h.Logger)
} else {
unknownErrorWithMessage(w, err, h.Logger)
}
2016-10-25 15:20:06 +00:00
return
}
// Remove all the associated kapacitors for this source
if err = h.removeSrcsKapa(ctx, id); err != nil {
unknownErrorWithMessage(w, err, h.Logger)
return
}
2016-10-25 15:20:06 +00:00
w.WriteHeader(http.StatusNoContent)
}
// removeSrcsKapa will remove all kapacitors and kapacitor rules from the stores.
// However, it will not remove the kapacitor tickscript from kapacitor itself.
func (h *Service) removeSrcsKapa(ctx context.Context, srcID int) error {
kapas, err := h.ServersStore.All(ctx)
if err != nil {
return err
}
// Filter the kapacitors to delete by matching the source id
deleteKapa := []int{}
for _, kapa := range kapas {
if kapa.SrcID == srcID {
deleteKapa = append(deleteKapa, kapa.ID)
}
}
for _, kapaID := range deleteKapa {
kapa := chronograf.Server{
ID: kapaID,
}
h.Logger.Debug("Deleting kapacitor resource id ", kapa.ID)
if err := h.ServersStore.Delete(ctx, kapa); err != nil {
return err
}
}
return nil
}
// UpdateSource handles incremental updates of a data source
func (h *Service) UpdateSource(w http.ResponseWriter, r *http.Request) {
2016-10-25 15:20:06 +00:00
id, err := paramID("id", r)
if err != nil {
2016-11-19 17:41:06 +00:00
Error(w, http.StatusUnprocessableEntity, err.Error(), h.Logger)
2016-10-25 15:20:06 +00:00
return
}
ctx := r.Context()
src, err := h.SourcesStore.Get(ctx, id)
if err != nil {
2016-11-19 17:41:06 +00:00
notFound(w, id, h.Logger)
2016-10-25 15:20:06 +00:00
return
}
var req chronograf.Source
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
2016-11-19 17:41:06 +00:00
invalidJSON(w, h.Logger)
2016-10-25 15:20:06 +00:00
return
}
src.Default = req.Default
src.InsecureSkipVerify = req.InsecureSkipVerify
2016-10-25 15:20:06 +00:00
if req.Name != "" {
src.Name = req.Name
}
if req.Password != "" {
src.Password = req.Password
}
if req.Username != "" {
src.Username = req.Username
}
if req.URL != "" {
src.URL = req.URL
}
2017-02-07 23:57:51 +00:00
if req.MetaURL != "" {
src.MetaURL = req.MetaURL
}
2016-10-25 15:20:06 +00:00
if req.Type != "" {
src.Type = req.Type
}
if req.Telegraf != "" {
src.Telegraf = req.Telegraf
}
2016-10-25 15:20:06 +00:00
if err := ValidSourceRequest(src); err != nil {
2016-11-19 17:41:06 +00:00
invalidData(w, err, h.Logger)
2016-10-25 15:20:06 +00:00
return
}
dbType, err := h.tsdbType(ctx, &src)
if err != nil {
Error(w, http.StatusBadRequest, "Error contacting source", h.Logger)
return
}
src.Type = dbType
2016-10-25 15:20:06 +00:00
if err := h.SourcesStore.Update(ctx, src); err != nil {
msg := fmt.Sprintf("Error updating source ID %d", id)
2016-11-19 17:41:06 +00:00
Error(w, http.StatusInternalServerError, msg, h.Logger)
2016-10-25 15:20:06 +00:00
return
}
encodeJSON(w, http.StatusOK, newSourceResponse(src), h.Logger)
}
// ValidSourceRequest checks if name, url and type are valid
2016-10-25 15:20:06 +00:00
func ValidSourceRequest(s chronograf.Source) error {
// Name and URL areq required
2017-03-13 22:15:01 +00:00
if s.URL == "" {
return fmt.Errorf("url required")
2016-10-25 15:20:06 +00:00
}
// Type must be influx or influx-enterprise
if s.Type != "" {
if s.Type != chronograf.InfluxDB && s.Type != chronograf.InfluxEnterprise && s.Type != chronograf.InfluxRelay {
2016-10-25 15:20:06 +00:00
return fmt.Errorf("invalid source type %s", s.Type)
}
}
url, err := url.ParseRequestURI(s.URL)
if err != nil {
return fmt.Errorf("invalid source URI: %v", err)
}
if len(url.Scheme) == 0 {
return fmt.Errorf("Invalid URL; no URL scheme defined")
}
return nil
}
// HandleNewSources parses and persists new sources passed in via server flag
func (h *Service) HandleNewSources(ctx context.Context, input string) error {
if input == "" {
return nil
}
var srcsKaps []struct {
Source chronograf.Source `json:"influxdb"`
Kapacitor chronograf.Server `json:"kapacitor"`
}
if err := json.Unmarshal([]byte(input), &srcsKaps); err != nil {
h.Logger.
WithField("component", "server").
WithField("NewSources", "invalid").
Error(err)
return err
}
for _, sk := range srcsKaps {
if err := ValidSourceRequest(sk.Source); err != nil {
return err
}
// Add any new sources and kapacitors as specified via server flag
if err := h.newSourceKapacitor(ctx, sk.Source, sk.Kapacitor); err != nil {
// Continue with server run even if adding NewSource fails
h.Logger.
WithField("component", "server").
WithField("NewSource", "invalid").
Error(err)
return err
}
}
return nil
}
// newSourceKapacitor adds sources to BoltDB idempotently by name, as well as respective kapacitors
func (h *Service) newSourceKapacitor(ctx context.Context, src chronograf.Source, kapa chronograf.Server) error {
srcs, err := h.SourcesStore.All(ctx)
if err != nil {
return err
}
for _, s := range srcs {
// If source already exists, do nothing
if s.Name == src.Name {
h.Logger.
WithField("component", "server").
WithField("NewSource", s.Name).
Info("Source already exists")
return nil
}
}
src, err = h.SourcesStore.Add(ctx, src)
if err != nil {
return err
}
kapa.SrcID = src.ID
if _, err := h.ServersStore.Add(ctx, kapa); err != nil {
return err
}
return nil
}