influxdb/server/sources.go

999 lines
26 KiB
Go
Raw Normal View History

2016-10-25 15:20:06 +00:00
package server
import (
"context"
2016-10-25 15:20:06 +00:00
"encoding/json"
"fmt"
"net/http"
"net/url"
"github.com/influxdata/chronograf/enterprise"
"github.com/influxdata/chronograf/organizations"
"github.com/bouk/httprouter"
2016-10-25 15:20:06 +00:00
"github.com/influxdata/chronograf"
"github.com/influxdata/chronograf/influx"
2016-10-25 15:20:06 +00:00
)
type sourceLinks struct {
2017-02-23 22:02:53 +00:00
Self string `json:"self"` // Self link mapping to this resource
Kapacitors string `json:"kapacitors"` // URL for kapacitors endpoint
Services string `json:"services"` // URL for services endpoint
2017-02-23 22:02:53 +00:00
Proxy string `json:"proxy"` // URL for proxy endpoint
Queries string `json:"queries"` // URL for the queries analysis endpoint
Write string `json:"write"` // URL for the write line-protocol endpoint
2017-02-23 22:02:53 +00:00
Permissions string `json:"permissions"` // URL for all allowed permissions for this source
Users string `json:"users"` // URL for all users associated with this source
Roles string `json:"roles,omitempty"` // URL for all users associated with this source
Databases string `json:"databases"` // URL for the databases contained within this source
Annotations string `json:"annotations"` // URL for the annotations of this source
2018-04-03 22:58:33 +00:00
Health string `json:"health"` // URL for source health
2016-10-25 15:20:06 +00:00
}
type sourceResponse struct {
chronograf.Source
AuthenticationMethod string `json:"authentication"`
Links sourceLinks `json:"links"`
2016-10-25 15:20:06 +00:00
}
type authenticationResponse struct {
ID int
AuthenticationMethod string
}
func sourceAuthenticationMethod(ctx context.Context, src chronograf.Source) authenticationResponse {
ldapEnabled := false
if src.MetaURL != "" {
authorizer := influx.DefaultAuthorization(&src)
metaURL, err := url.Parse(src.MetaURL)
if err == nil {
client := enterprise.NewMetaClient(metaURL, src.InsecureSkipVerify, authorizer)
config, err := client.GetLDAPConfig(ctx)
if err == nil {
2018-07-02 21:17:32 +00:00
ldapEnabled = config.Structured.Enabled
}
}
}
if ldapEnabled {
return authenticationResponse{ID: src.ID, AuthenticationMethod: "ldap"}
} else if src.Username != "" && src.Password != "" {
return authenticationResponse{ID: src.ID, AuthenticationMethod: "basic"}
} else if src.SharedSecret != "" {
return authenticationResponse{ID: src.ID, AuthenticationMethod: "shared"}
} else {
return authenticationResponse{ID: src.ID, AuthenticationMethod: "unknown"}
}
}
func newSourceResponse(ctx context.Context, src chronograf.Source) sourceResponse {
// If telegraf is not set, we'll set it to the default value.
if src.Telegraf == "" {
src.Telegraf = "telegraf"
}
authMethod := sourceAuthenticationMethod(ctx, src)
// Omit the password and shared secret on response
2017-02-12 22:38:05 +00:00
src.Password = ""
src.SharedSecret = ""
2017-02-12 22:38:05 +00:00
httpAPISrcs := "/chronograf/v1/sources"
2017-02-23 22:02:53 +00:00
res := sourceResponse{
Source: src,
AuthenticationMethod: authMethod.AuthenticationMethod,
2016-10-25 15:20:06 +00:00
Links: sourceLinks{
Self: fmt.Sprintf("%s/%d", httpAPISrcs, src.ID),
Kapacitors: fmt.Sprintf("%s/%d/kapacitors", httpAPISrcs, src.ID),
Services: fmt.Sprintf("%s/%d/services", httpAPISrcs, src.ID),
Proxy: fmt.Sprintf("%s/%d/proxy", httpAPISrcs, src.ID),
Queries: fmt.Sprintf("%s/%d/queries", httpAPISrcs, src.ID),
Write: fmt.Sprintf("%s/%d/write", httpAPISrcs, src.ID),
Permissions: fmt.Sprintf("%s/%d/permissions", httpAPISrcs, src.ID),
Users: fmt.Sprintf("%s/%d/users", httpAPISrcs, src.ID),
Databases: fmt.Sprintf("%s/%d/dbs", httpAPISrcs, src.ID),
Annotations: fmt.Sprintf("%s/%d/annotations", httpAPISrcs, src.ID),
2018-04-03 22:58:33 +00:00
Health: fmt.Sprintf("%s/%d/health", httpAPISrcs, src.ID),
2016-10-25 15:20:06 +00:00
},
}
2017-02-23 22:02:53 +00:00
// MetaURL is currently a string, but eventually, we'd like to change it
// to a slice. Checking len(src.MetaURL) is functionally equivalent to
// checking if it is equal to the empty string.
if src.Type == chronograf.InfluxEnterprise && len(src.MetaURL) != 0 {
2017-02-23 22:02:53 +00:00
res.Links.Roles = fmt.Sprintf("%s/%d/roles", httpAPISrcs, src.ID)
}
return res
2016-10-25 15:20:06 +00:00
}
// NewSource adds a new valid source to the store
func (s *Service) NewSource(w http.ResponseWriter, r *http.Request) {
2016-10-25 15:20:06 +00:00
var src chronograf.Source
if err := json.NewDecoder(r.Body).Decode(&src); err != nil {
invalidJSON(w, s.Logger)
2016-10-25 15:20:06 +00:00
return
}
ctx := r.Context()
defaultOrg, err := s.Store.Organizations(ctx).DefaultOrganization(ctx)
if err != nil {
unknownErrorWithMessage(w, err, s.Logger)
return
}
if err := ValidSourceRequest(&src, defaultOrg.ID); err != nil {
invalidData(w, err, s.Logger)
2016-10-25 15:20:06 +00:00
return
}
// By default the telegraf database will be telegraf
if src.Telegraf == "" {
src.Telegraf = "telegraf"
}
dbType, err := s.tsdbType(ctx, &src)
if err != nil {
Error(w, http.StatusBadRequest, "Error contacting source", s.Logger)
return
}
src.Type = dbType
if src, err = s.Store.Sources(ctx).Add(ctx, src); err != nil {
2016-10-25 15:20:06 +00:00
msg := fmt.Errorf("Error storing source %v: %v", src, err)
unknownErrorWithMessage(w, msg, s.Logger)
2016-10-25 15:20:06 +00:00
return
}
res := newSourceResponse(ctx, src)
location(w, res.Links.Self)
encodeJSON(w, http.StatusCreated, res, s.Logger)
2016-10-25 15:20:06 +00:00
}
func (s *Service) tsdbType(ctx context.Context, src *chronograf.Source) (string, error) {
cli := &influx.Client{
Logger: s.Logger,
}
if err := cli.Connect(ctx, src); err != nil {
return "", err
}
return cli.Type(ctx)
}
2016-10-25 15:20:06 +00:00
type getSourcesResponse struct {
Sources []sourceResponse `json:"sources"`
}
// Sources returns all sources from the store.
func (s *Service) Sources(w http.ResponseWriter, r *http.Request) {
2016-10-25 15:20:06 +00:00
ctx := r.Context()
srcs, err := s.Store.Sources(ctx).All(ctx)
2016-10-25 15:20:06 +00:00
if err != nil {
Error(w, http.StatusInternalServerError, "Error loading sources", s.Logger)
2016-10-25 15:20:06 +00:00
return
}
res := getSourcesResponse{
Sources: make([]sourceResponse, len(srcs)),
}
var sources []sourceResponse
for _, src := range srcs {
sources = append(sources, newSourceResponse(ctx, src))
2016-10-25 15:20:06 +00:00
}
res.Sources = sources
encodeJSON(w, http.StatusOK, res, s.Logger)
2016-10-25 15:20:06 +00:00
}
// SourcesID retrieves a source from the store
func (s *Service) SourcesID(w http.ResponseWriter, r *http.Request) {
2016-10-25 15:20:06 +00:00
id, err := paramID("id", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error(), s.Logger)
2016-10-25 15:20:06 +00:00
return
}
ctx := r.Context()
src, err := s.Store.Sources(ctx).Get(ctx, id)
2016-10-25 15:20:06 +00:00
if err != nil {
notFound(w, id, s.Logger)
2016-10-25 15:20:06 +00:00
return
}
res := newSourceResponse(ctx, src)
encodeJSON(w, http.StatusOK, res, s.Logger)
2016-10-25 15:20:06 +00:00
}
// RemoveSource deletes the source from the store
func (s *Service) RemoveSource(w http.ResponseWriter, r *http.Request) {
2016-10-25 15:20:06 +00:00
id, err := paramID("id", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error(), s.Logger)
2016-10-25 15:20:06 +00:00
return
}
src := chronograf.Source{ID: id}
ctx := r.Context()
if err = s.Store.Sources(ctx).Delete(ctx, src); err != nil {
Redirect to default source when deleting sources (#1074) * Redirect to default source on invalid source ID When supplied with an invalid source ID, the CheckSources component would redirect the user to a "Create Source" page. This caused surprising behavior when a source was deleted because that source ID would become invalid. The effect being that deleting a source brought users immediately to the create source page, rather than back to the sources list. This instead redirects users to the default source when provided an invalid source id. The backend automatically re-assigns the "default" source, so this will always succeed, since sources are fetched again from the backend. The regex used is slightly dependent on URL structure that has been stable over the lifetime of this project. Also it relies on URL structure more than the previous redirecting implementation. * Force sources to reload after deletion Deleting a source invalidates the state held by the client because of automatic re-assignment of the default source by the backend. Without duplicating backend logic, it is impossible for the frontend to discover the new source without reloading sources. The ManageSources page now uses an async-action creator which deletes the requested source and reloads all sources. The source action creators have also been refactored to use implicit returns like other action creators. * Remove Dead removeSource action removeSource is no longer used because the API invalidates its assumptions. For more information, see 04bf3ca. * Update Changelog with source deletion redirect fix Users are no longer unexpectedly redirected to the "create source" page whenever they delete a source that they are connected to. * Return 404 when deleting non-existent source When deleting a source, a new default is assigned automatically. If a non-existent source ID was provided, previously this would result in a 500. This is a violation of the Swagger docs. The solution is to examine the error and if it was an ErrSourceNotFound, invoke the notFound handler. * Add Error handling to source deletion There are two kinds of errors that can be encountered when deleting a source: a 404 and a 500 (from either the delete or the subsequent fetch). The 404 is a precondition failure of the action creator. The source.id requested can be non-existent for two reasons: 1) The action creator was passed garbage by the caller. 2) A concurrent write occurred which silently invalidated this session's state. For the first case, we can ensure that the caller is sane by having an assertion check that the requested source is among some set of sources. This could be circumvented by a caller, but chances are good that both the full set of sources and the desired source are both available to callers of this action creator. The second case is not an error. In this case, we should proceed reloading sources, since the deletion that was requested has already been performed by someone else. Finally, 500s can only occur if there is something broken with the API. In this situation, we provide a notification that tells the user to check the API logs for more information. * Remove duplicate CHANGELOG entries These were introduced due to a naive merge conflict resolution. * Remove assertion This was decided to be confusing and unnecessary. * Remove remnants of removed assertion These were needed for an assertion that has been removed. It's no longer necessary to pass `sources` to the action creator.
2017-03-28 15:53:11 +00:00
if err == chronograf.ErrSourceNotFound {
notFound(w, id, s.Logger)
Redirect to default source when deleting sources (#1074) * Redirect to default source on invalid source ID When supplied with an invalid source ID, the CheckSources component would redirect the user to a "Create Source" page. This caused surprising behavior when a source was deleted because that source ID would become invalid. The effect being that deleting a source brought users immediately to the create source page, rather than back to the sources list. This instead redirects users to the default source when provided an invalid source id. The backend automatically re-assigns the "default" source, so this will always succeed, since sources are fetched again from the backend. The regex used is slightly dependent on URL structure that has been stable over the lifetime of this project. Also it relies on URL structure more than the previous redirecting implementation. * Force sources to reload after deletion Deleting a source invalidates the state held by the client because of automatic re-assignment of the default source by the backend. Without duplicating backend logic, it is impossible for the frontend to discover the new source without reloading sources. The ManageSources page now uses an async-action creator which deletes the requested source and reloads all sources. The source action creators have also been refactored to use implicit returns like other action creators. * Remove Dead removeSource action removeSource is no longer used because the API invalidates its assumptions. For more information, see 04bf3ca. * Update Changelog with source deletion redirect fix Users are no longer unexpectedly redirected to the "create source" page whenever they delete a source that they are connected to. * Return 404 when deleting non-existent source When deleting a source, a new default is assigned automatically. If a non-existent source ID was provided, previously this would result in a 500. This is a violation of the Swagger docs. The solution is to examine the error and if it was an ErrSourceNotFound, invoke the notFound handler. * Add Error handling to source deletion There are two kinds of errors that can be encountered when deleting a source: a 404 and a 500 (from either the delete or the subsequent fetch). The 404 is a precondition failure of the action creator. The source.id requested can be non-existent for two reasons: 1) The action creator was passed garbage by the caller. 2) A concurrent write occurred which silently invalidated this session's state. For the first case, we can ensure that the caller is sane by having an assertion check that the requested source is among some set of sources. This could be circumvented by a caller, but chances are good that both the full set of sources and the desired source are both available to callers of this action creator. The second case is not an error. In this case, we should proceed reloading sources, since the deletion that was requested has already been performed by someone else. Finally, 500s can only occur if there is something broken with the API. In this situation, we provide a notification that tells the user to check the API logs for more information. * Remove duplicate CHANGELOG entries These were introduced due to a naive merge conflict resolution. * Remove assertion This was decided to be confusing and unnecessary. * Remove remnants of removed assertion These were needed for an assertion that has been removed. It's no longer necessary to pass `sources` to the action creator.
2017-03-28 15:53:11 +00:00
} else {
unknownErrorWithMessage(w, err, s.Logger)
Redirect to default source when deleting sources (#1074) * Redirect to default source on invalid source ID When supplied with an invalid source ID, the CheckSources component would redirect the user to a "Create Source" page. This caused surprising behavior when a source was deleted because that source ID would become invalid. The effect being that deleting a source brought users immediately to the create source page, rather than back to the sources list. This instead redirects users to the default source when provided an invalid source id. The backend automatically re-assigns the "default" source, so this will always succeed, since sources are fetched again from the backend. The regex used is slightly dependent on URL structure that has been stable over the lifetime of this project. Also it relies on URL structure more than the previous redirecting implementation. * Force sources to reload after deletion Deleting a source invalidates the state held by the client because of automatic re-assignment of the default source by the backend. Without duplicating backend logic, it is impossible for the frontend to discover the new source without reloading sources. The ManageSources page now uses an async-action creator which deletes the requested source and reloads all sources. The source action creators have also been refactored to use implicit returns like other action creators. * Remove Dead removeSource action removeSource is no longer used because the API invalidates its assumptions. For more information, see 04bf3ca. * Update Changelog with source deletion redirect fix Users are no longer unexpectedly redirected to the "create source" page whenever they delete a source that they are connected to. * Return 404 when deleting non-existent source When deleting a source, a new default is assigned automatically. If a non-existent source ID was provided, previously this would result in a 500. This is a violation of the Swagger docs. The solution is to examine the error and if it was an ErrSourceNotFound, invoke the notFound handler. * Add Error handling to source deletion There are two kinds of errors that can be encountered when deleting a source: a 404 and a 500 (from either the delete or the subsequent fetch). The 404 is a precondition failure of the action creator. The source.id requested can be non-existent for two reasons: 1) The action creator was passed garbage by the caller. 2) A concurrent write occurred which silently invalidated this session's state. For the first case, we can ensure that the caller is sane by having an assertion check that the requested source is among some set of sources. This could be circumvented by a caller, but chances are good that both the full set of sources and the desired source are both available to callers of this action creator. The second case is not an error. In this case, we should proceed reloading sources, since the deletion that was requested has already been performed by someone else. Finally, 500s can only occur if there is something broken with the API. In this situation, we provide a notification that tells the user to check the API logs for more information. * Remove duplicate CHANGELOG entries These were introduced due to a naive merge conflict resolution. * Remove assertion This was decided to be confusing and unnecessary. * Remove remnants of removed assertion These were needed for an assertion that has been removed. It's no longer necessary to pass `sources` to the action creator.
2017-03-28 15:53:11 +00:00
}
2016-10-25 15:20:06 +00:00
return
}
// Remove all the associated kapacitors for this source
if err = s.removeSrcsKapa(ctx, id); err != nil {
unknownErrorWithMessage(w, err, s.Logger)
return
}
2016-10-25 15:20:06 +00:00
w.WriteHeader(http.StatusNoContent)
}
2018-04-03 22:58:33 +00:00
// SourceHealth determines if the tsdb is running
func (s *Service) SourceHealth(w http.ResponseWriter, r *http.Request) {
id, err := paramID("id", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error(), s.Logger)
return
}
ctx := r.Context()
src, err := s.Store.Sources(ctx).Get(ctx, id)
if err != nil {
notFound(w, id, s.Logger)
return
}
cli := &influx.Client{
Logger: s.Logger,
}
if err := cli.Connect(ctx, &src); err != nil {
Error(w, http.StatusBadRequest, "Error contacting source", s.Logger)
return
}
if err := cli.Ping(ctx); err != nil {
Error(w, http.StatusBadRequest, "Error contacting source", s.Logger)
return
}
w.WriteHeader(http.StatusOK)
}
// removeSrcsKapa will remove all kapacitors and kapacitor rules from the stores.
// However, it will not remove the kapacitor tickscript from kapacitor itself.
func (s *Service) removeSrcsKapa(ctx context.Context, srcID int) error {
kapas, err := s.Store.Servers(ctx).All(ctx)
if err != nil {
return err
}
// Filter the kapacitors to delete by matching the source id
deleteKapa := []int{}
for _, kapa := range kapas {
if kapa.SrcID == srcID {
deleteKapa = append(deleteKapa, kapa.ID)
}
}
for _, kapaID := range deleteKapa {
kapa := chronograf.Server{
ID: kapaID,
}
s.Logger.Debug("Deleting kapacitor resource id ", kapa.ID)
if err := s.Store.Servers(ctx).Delete(ctx, kapa); err != nil {
return err
}
}
return nil
}
// UpdateSource handles incremental updates of a data source
func (s *Service) UpdateSource(w http.ResponseWriter, r *http.Request) {
2016-10-25 15:20:06 +00:00
id, err := paramID("id", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error(), s.Logger)
2016-10-25 15:20:06 +00:00
return
}
ctx := r.Context()
src, err := s.Store.Sources(ctx).Get(ctx, id)
2016-10-25 15:20:06 +00:00
if err != nil {
notFound(w, id, s.Logger)
2016-10-25 15:20:06 +00:00
return
}
var req chronograf.Source
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
invalidJSON(w, s.Logger)
2016-10-25 15:20:06 +00:00
return
}
src.Default = req.Default
src.InsecureSkipVerify = req.InsecureSkipVerify
2016-10-25 15:20:06 +00:00
if req.Name != "" {
src.Name = req.Name
}
if req.Password != "" {
src.Password = req.Password
}
if req.Username != "" {
src.Username = req.Username
}
if req.URL != "" {
src.URL = req.URL
}
// If the supplied MetaURL is different from the
// one supplied on the request, update the value
if req.MetaURL != src.MetaURL {
2017-02-07 23:57:51 +00:00
src.MetaURL = req.MetaURL
}
2016-10-25 15:20:06 +00:00
if req.Type != "" {
src.Type = req.Type
}
if req.Telegraf != "" {
src.Telegraf = req.Telegraf
}
src.DefaultRP = req.DefaultRP
2016-10-25 15:20:06 +00:00
defaultOrg, err := s.Store.Organizations(ctx).DefaultOrganization(ctx)
if err != nil {
unknownErrorWithMessage(w, err, s.Logger)
return
}
if err := ValidSourceRequest(&src, defaultOrg.ID); err != nil {
invalidData(w, err, s.Logger)
2016-10-25 15:20:06 +00:00
return
}
dbType, err := s.tsdbType(ctx, &src)
if err != nil {
Error(w, http.StatusBadRequest, "Error contacting source", s.Logger)
return
}
src.Type = dbType
if err := s.Store.Sources(ctx).Update(ctx, src); err != nil {
2016-10-25 15:20:06 +00:00
msg := fmt.Sprintf("Error updating source ID %d", id)
Error(w, http.StatusInternalServerError, msg, s.Logger)
2016-10-25 15:20:06 +00:00
return
}
encodeJSON(w, http.StatusOK, newSourceResponse(context.Background(), src), s.Logger)
2016-10-25 15:20:06 +00:00
}
2017-11-10 17:20:58 +00:00
// ValidSourceRequest checks if name, url, type, and role are valid
func ValidSourceRequest(s *chronograf.Source, defaultOrgID string) error {
if s == nil {
return fmt.Errorf("source must be non-nil")
}
2016-10-25 15:20:06 +00:00
// Name and URL areq required
2017-03-13 22:15:01 +00:00
if s.URL == "" {
return fmt.Errorf("url required")
2016-10-25 15:20:06 +00:00
}
// Type must be influx or influx-enterprise
if s.Type != "" {
if s.Type != chronograf.InfluxDB && s.Type != chronograf.InfluxEnterprise && s.Type != chronograf.InfluxRelay {
2016-10-25 15:20:06 +00:00
return fmt.Errorf("invalid source type %s", s.Type)
}
}
2017-10-25 15:51:15 +00:00
if s.Organization == "" {
s.Organization = defaultOrgID
2017-10-25 15:51:15 +00:00
}
2016-10-25 15:20:06 +00:00
url, err := url.ParseRequestURI(s.URL)
if err != nil {
return fmt.Errorf("invalid source URI: %v", err)
}
if len(url.Scheme) == 0 {
return fmt.Errorf("Invalid URL; no URL scheme defined")
}
2016-10-25 15:20:06 +00:00
return nil
}
// HandleNewSources parses and persists new sources passed in via server flag
func (s *Service) HandleNewSources(ctx context.Context, input string) error {
if input == "" {
return nil
}
s.Logger.Error("--new-sources is deprecated and will be removed in a future version.")
var srcsKaps []struct {
Source chronograf.Source `json:"influxdb"`
Kapacitor chronograf.Server `json:"kapacitor"`
}
if err := json.Unmarshal([]byte(input), &srcsKaps); err != nil {
s.Logger.
WithField("component", "server").
WithField("NewSources", "invalid").
Error(err)
return err
}
ctx = context.WithValue(ctx, organizations.ContextKey, "default")
defaultOrg, err := s.Store.Organizations(ctx).DefaultOrganization(ctx)
if err != nil {
return err
}
for _, sk := range srcsKaps {
if err := ValidSourceRequest(&sk.Source, defaultOrg.ID); err != nil {
return err
}
// Add any new sources and kapacitors as specified via server flag
if err := s.newSourceKapacitor(ctx, sk.Source, sk.Kapacitor); err != nil {
// Continue with server run even if adding NewSource fails
s.Logger.
WithField("component", "server").
WithField("NewSource", "invalid").
Error(err)
return err
}
}
return nil
}
// newSourceKapacitor adds sources to BoltDB idempotently by name, as well as respective kapacitors
func (s *Service) newSourceKapacitor(ctx context.Context, src chronograf.Source, kapa chronograf.Server) error {
srcs, err := s.Store.Sources(ctx).All(ctx)
if err != nil {
return err
}
for _, source := range srcs {
// If source already exists, do nothing
if source.Name == src.Name {
s.Logger.
WithField("component", "server").
WithField("NewSource", source.Name).
Info("Source already exists")
return nil
}
}
src, err = s.Store.Sources(ctx).Add(ctx, src)
if err != nil {
return err
}
kapa.SrcID = src.ID
if _, err := s.Store.Servers(ctx).Add(ctx, kapa); err != nil {
return err
}
return nil
}
// NewSourceUser adds user to source
func (s *Service) NewSourceUser(w http.ResponseWriter, r *http.Request) {
var req sourceUserRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
invalidJSON(w, s.Logger)
return
}
if err := req.ValidCreate(); err != nil {
invalidData(w, err, s.Logger)
return
}
ctx := r.Context()
srcID, ts, err := s.sourcesSeries(ctx, w, r)
if err != nil {
return
}
store := ts.Users(ctx)
user := &chronograf.User{
Name: req.Username,
Passwd: req.Password,
Permissions: req.Permissions,
Roles: req.Roles,
}
res, err := store.Add(ctx, user)
if err != nil {
Error(w, http.StatusBadRequest, err.Error(), s.Logger)
return
}
if err != nil {
Error(w, http.StatusBadRequest, err.Error(), s.Logger)
return
}
su := newSourceUserResponse(srcID, res.Name).WithPermissions(res.Permissions)
if _, hasRoles := s.hasRoles(ctx, ts); hasRoles {
su.WithRoles(srcID, res.Roles)
}
location(w, su.Links.Self)
encodeJSON(w, http.StatusCreated, su, s.Logger)
}
// SourceUsers retrieves all users from source.
func (s *Service) SourceUsers(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
srcID, ts, err := s.sourcesSeries(ctx, w, r)
if err != nil {
return
}
store := ts.Users(ctx)
users, err := store.All(ctx)
if err != nil {
Error(w, http.StatusBadRequest, err.Error(), s.Logger)
return
}
_, hasRoles := s.hasRoles(ctx, ts)
ur := make([]sourceUserResponse, len(users))
for i, u := range users {
usr := newSourceUserResponse(srcID, u.Name).WithPermissions(u.Permissions)
if hasRoles {
usr.WithRoles(srcID, u.Roles)
}
ur[i] = *usr
}
res := sourceUsersResponse{
Users: ur,
}
encodeJSON(w, http.StatusOK, res, s.Logger)
}
// SourceUserID retrieves a user with ID from store.
// In InfluxDB, a User's Name is their UID, hence the semantic below.
func (s *Service) SourceUserID(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
uid := httprouter.GetParamFromContext(ctx, "uid")
srcID, ts, err := s.sourcesSeries(ctx, w, r)
if err != nil {
return
}
store := ts.Users(ctx)
u, err := store.Get(ctx, chronograf.UserQuery{Name: &uid})
if err != nil {
Error(w, http.StatusBadRequest, err.Error(), s.Logger)
return
}
res := newSourceUserResponse(srcID, u.Name).WithPermissions(u.Permissions)
if _, hasRoles := s.hasRoles(ctx, ts); hasRoles {
res.WithRoles(srcID, u.Roles)
}
encodeJSON(w, http.StatusOK, res, s.Logger)
}
// RemoveSourceUser removes the user from the InfluxDB source
func (s *Service) RemoveSourceUser(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
uid := httprouter.GetParamFromContext(ctx, "uid")
_, store, err := s.sourceUsersStore(ctx, w, r)
if err != nil {
return
}
if err := store.Delete(ctx, &chronograf.User{Name: uid}); err != nil {
Error(w, http.StatusBadRequest, err.Error(), s.Logger)
return
}
w.WriteHeader(http.StatusNoContent)
}
// UpdateSourceUser changes the password or permissions of a source user
func (s *Service) UpdateSourceUser(w http.ResponseWriter, r *http.Request) {
var req sourceUserRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
invalidJSON(w, s.Logger)
return
}
if err := req.ValidUpdate(); err != nil {
invalidData(w, err, s.Logger)
return
}
ctx := r.Context()
uid := httprouter.GetParamFromContext(ctx, "uid")
srcID, ts, err := s.sourcesSeries(ctx, w, r)
if err != nil {
return
}
user := &chronograf.User{
Name: uid,
Passwd: req.Password,
Permissions: req.Permissions,
Roles: req.Roles,
}
store := ts.Users(ctx)
if err := store.Update(ctx, user); err != nil {
Error(w, http.StatusBadRequest, err.Error(), s.Logger)
return
}
u, err := store.Get(ctx, chronograf.UserQuery{Name: &uid})
if err != nil {
Error(w, http.StatusBadRequest, err.Error(), s.Logger)
return
}
res := newSourceUserResponse(srcID, u.Name).WithPermissions(u.Permissions)
if _, hasRoles := s.hasRoles(ctx, ts); hasRoles {
res.WithRoles(srcID, u.Roles)
}
location(w, res.Links.Self)
encodeJSON(w, http.StatusOK, res, s.Logger)
}
func (s *Service) sourcesSeries(ctx context.Context, w http.ResponseWriter, r *http.Request) (int, chronograf.TimeSeries, error) {
srcID, err := paramID("id", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error(), s.Logger)
return 0, nil, err
}
src, err := s.Store.Sources(ctx).Get(ctx, srcID)
if err != nil {
notFound(w, srcID, s.Logger)
return 0, nil, err
}
ts, err := s.TimeSeries(src)
if err != nil {
msg := fmt.Sprintf("Unable to connect to source %d: %v", srcID, err)
Error(w, http.StatusBadRequest, msg, s.Logger)
return 0, nil, err
}
if err = ts.Connect(ctx, &src); err != nil {
msg := fmt.Sprintf("Unable to connect to source %d: %v", srcID, err)
Error(w, http.StatusBadRequest, msg, s.Logger)
return 0, nil, err
}
return srcID, ts, nil
}
func (s *Service) sourceUsersStore(ctx context.Context, w http.ResponseWriter, r *http.Request) (int, chronograf.UsersStore, error) {
srcID, ts, err := s.sourcesSeries(ctx, w, r)
if err != nil {
return 0, nil, err
}
store := ts.Users(ctx)
return srcID, store, nil
}
// hasRoles checks if the influx source has roles or not
func (s *Service) hasRoles(ctx context.Context, ts chronograf.TimeSeries) (chronograf.RolesStore, bool) {
store, err := ts.Roles(ctx)
if err != nil {
return nil, false
}
return store, true
}
type sourceUserRequest struct {
Username string `json:"name,omitempty"` // Username for new account
Password string `json:"password,omitempty"` // Password for new account
Permissions chronograf.Permissions `json:"permissions,omitempty"` // Optional permissions
Roles []chronograf.Role `json:"roles,omitempty"` // Optional roles
}
func (r *sourceUserRequest) ValidCreate() error {
if r.Username == "" {
return fmt.Errorf("Username required")
}
if r.Password == "" {
return fmt.Errorf("Password required")
}
return validPermissions(&r.Permissions)
}
type sourceUsersResponse struct {
Users []sourceUserResponse `json:"users"`
}
func (r *sourceUserRequest) ValidUpdate() error {
2018-04-03 00:09:31 +00:00
if r.Password == "" && r.Permissions == nil && r.Roles == nil {
return fmt.Errorf("No fields to update")
}
return validPermissions(&r.Permissions)
}
type sourceUserResponse struct {
Name string // Username for new account
Permissions chronograf.Permissions // Account's permissions
Roles []sourceRoleResponse // Roles if source uses them
Links selfLinks // Links are URI locations related to user
hasPermissions bool
hasRoles bool
}
func (u *sourceUserResponse) MarshalJSON() ([]byte, error) {
res := map[string]interface{}{
"name": u.Name,
"links": u.Links,
}
if u.hasRoles {
res["roles"] = u.Roles
}
if u.hasPermissions {
res["permissions"] = u.Permissions
}
return json.Marshal(res)
}
// newSourceUserResponse creates an HTTP JSON response for a user w/o roles
func newSourceUserResponse(srcID int, name string) *sourceUserResponse {
self := newSelfLinks(srcID, "users", name)
return &sourceUserResponse{
Name: name,
Links: self,
}
}
func (u *sourceUserResponse) WithPermissions(perms chronograf.Permissions) *sourceUserResponse {
u.hasPermissions = true
if perms == nil {
perms = make(chronograf.Permissions, 0)
}
u.Permissions = perms
return u
}
// WithRoles adds roles to the HTTP JSON response for a user
func (u *sourceUserResponse) WithRoles(srcID int, roles []chronograf.Role) *sourceUserResponse {
u.hasRoles = true
rr := make([]sourceRoleResponse, len(roles))
for i, role := range roles {
rr[i] = newSourceRoleResponse(srcID, &role)
}
u.Roles = rr
return u
}
type selfLinks struct {
Self string `json:"self"` // Self link mapping to this resource
}
func newSelfLinks(id int, parent, resource string) selfLinks {
httpAPISrcs := "/chronograf/v1/sources"
u := &url.URL{Path: resource}
encodedResource := u.String()
return selfLinks{
Self: fmt.Sprintf("%s/%d/%s/%s", httpAPISrcs, id, parent, encodedResource),
}
}
// NewSourceRole adds role to source
func (s *Service) NewSourceRole(w http.ResponseWriter, r *http.Request) {
var req sourceRoleRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
invalidJSON(w, s.Logger)
return
}
if err := req.ValidCreate(); err != nil {
invalidData(w, err, s.Logger)
return
}
ctx := r.Context()
srcID, ts, err := s.sourcesSeries(ctx, w, r)
if err != nil {
return
}
roles, ok := s.hasRoles(ctx, ts)
if !ok {
Error(w, http.StatusNotFound, fmt.Sprintf("Source %d does not have role capability", srcID), s.Logger)
return
}
if _, err := roles.Get(ctx, req.Name); err == nil {
Error(w, http.StatusBadRequest, fmt.Sprintf("Source %d already has role %s", srcID, req.Name), s.Logger)
return
}
res, err := roles.Add(ctx, &req.Role)
if err != nil {
Error(w, http.StatusBadRequest, err.Error(), s.Logger)
return
}
rr := newSourceRoleResponse(srcID, res)
location(w, rr.Links.Self)
encodeJSON(w, http.StatusCreated, rr, s.Logger)
}
// UpdateSourceRole changes the permissions or users of a role
func (s *Service) UpdateSourceRole(w http.ResponseWriter, r *http.Request) {
var req sourceRoleRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
invalidJSON(w, s.Logger)
return
}
if err := req.ValidUpdate(); err != nil {
invalidData(w, err, s.Logger)
return
}
ctx := r.Context()
srcID, ts, err := s.sourcesSeries(ctx, w, r)
if err != nil {
return
}
roles, ok := s.hasRoles(ctx, ts)
if !ok {
Error(w, http.StatusNotFound, fmt.Sprintf("Source %d does not have role capability", srcID), s.Logger)
return
}
rid := httprouter.GetParamFromContext(ctx, "rid")
req.Name = rid
if err := roles.Update(ctx, &req.Role); err != nil {
Error(w, http.StatusBadRequest, err.Error(), s.Logger)
return
}
role, err := roles.Get(ctx, req.Name)
if err != nil {
Error(w, http.StatusBadRequest, err.Error(), s.Logger)
return
}
rr := newSourceRoleResponse(srcID, role)
location(w, rr.Links.Self)
encodeJSON(w, http.StatusOK, rr, s.Logger)
}
// SourceRoleID retrieves a role with ID from store.
func (s *Service) SourceRoleID(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
srcID, ts, err := s.sourcesSeries(ctx, w, r)
if err != nil {
return
}
roles, ok := s.hasRoles(ctx, ts)
if !ok {
Error(w, http.StatusNotFound, fmt.Sprintf("Source %d does not have role capability", srcID), s.Logger)
return
}
rid := httprouter.GetParamFromContext(ctx, "rid")
role, err := roles.Get(ctx, rid)
if err != nil {
Error(w, http.StatusBadRequest, err.Error(), s.Logger)
return
}
rr := newSourceRoleResponse(srcID, role)
encodeJSON(w, http.StatusOK, rr, s.Logger)
}
// SourceRoles retrieves all roles from the store
func (s *Service) SourceRoles(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
srcID, ts, err := s.sourcesSeries(ctx, w, r)
if err != nil {
return
}
store, ok := s.hasRoles(ctx, ts)
if !ok {
Error(w, http.StatusNotFound, fmt.Sprintf("Source %d does not have role capability", srcID), s.Logger)
return
}
roles, err := store.All(ctx)
if err != nil {
Error(w, http.StatusBadRequest, err.Error(), s.Logger)
return
}
rr := make([]sourceRoleResponse, len(roles))
for i, role := range roles {
rr[i] = newSourceRoleResponse(srcID, &role)
}
res := struct {
Roles []sourceRoleResponse `json:"roles"`
}{rr}
encodeJSON(w, http.StatusOK, res, s.Logger)
}
// RemoveSourceRole removes role from data source.
func (s *Service) RemoveSourceRole(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
srcID, ts, err := s.sourcesSeries(ctx, w, r)
if err != nil {
return
}
roles, ok := s.hasRoles(ctx, ts)
if !ok {
Error(w, http.StatusNotFound, fmt.Sprintf("Source %d does not have role capability", srcID), s.Logger)
return
}
rid := httprouter.GetParamFromContext(ctx, "rid")
if err := roles.Delete(ctx, &chronograf.Role{Name: rid}); err != nil {
Error(w, http.StatusBadRequest, err.Error(), s.Logger)
return
}
w.WriteHeader(http.StatusNoContent)
}
// sourceRoleRequest is the format used for both creating and updating roles
type sourceRoleRequest struct {
chronograf.Role
}
func (r *sourceRoleRequest) ValidCreate() error {
if r.Name == "" || len(r.Name) > 254 {
return fmt.Errorf("Name is required for a role")
}
for _, user := range r.Users {
if user.Name == "" {
return fmt.Errorf("Username required")
}
}
return validPermissions(&r.Permissions)
}
func (r *sourceRoleRequest) ValidUpdate() error {
if len(r.Name) > 254 {
return fmt.Errorf("Username too long; must be less than 254 characters")
}
for _, user := range r.Users {
if user.Name == "" {
return fmt.Errorf("Username required")
}
}
return validPermissions(&r.Permissions)
}
type sourceRoleResponse struct {
Users []*sourceUserResponse `json:"users"`
Name string `json:"name"`
Permissions chronograf.Permissions `json:"permissions"`
Links selfLinks `json:"links"`
}
func newSourceRoleResponse(srcID int, res *chronograf.Role) sourceRoleResponse {
su := make([]*sourceUserResponse, len(res.Users))
for i := range res.Users {
name := res.Users[i].Name
su[i] = newSourceUserResponse(srcID, name)
}
if res.Permissions == nil {
res.Permissions = make(chronograf.Permissions, 0)
}
return sourceRoleResponse{
Name: res.Name,
Permissions: res.Permissions,
Users: su,
Links: newSelfLinks(srcID, "roles", res.Name),
}
}