influxdb/http/api_handler.go

271 lines
12 KiB
Go
Raw Normal View History

package http
import (
"net/http"
"github.com/go-chi/chi"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorizer"
"github.com/influxdata/influxdb/v2/chronograf/server"
"github.com/influxdata/influxdb/v2/dbrp"
"github.com/influxdata/influxdb/v2/http/metric"
"github.com/influxdata/influxdb/v2/influxql"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/kit/prom"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/storage"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
// APIHandler is a collection of all the service handlers.
type APIHandler struct {
chi.Router
}
// APIBackend is all services and associated parameters required to construct
// an APIHandler.
type APIBackend struct {
AssetsPath string // if empty then assets are served from bindata.
Logger *zap.Logger
influxdb.HTTPErrorHandler
SessionRenewDisabled bool
// MaxBatchSizeBytes is the maximum number of bytes which can be written
// in a single points batch
MaxBatchSizeBytes int64
// WriteParserMaxBytes specifies the maximum number of bytes that may be allocated when processing a single
// write request. A value of zero specifies there is no limit.
WriteParserMaxBytes int
// WriteParserMaxLines specifies the maximum number of lines that may be parsed when processing a single
// write request. A value of zero specifies there is no limit.
WriteParserMaxLines int
// WriteParserMaxValues specifies the maximum number of values that may be parsed when processing a single
// write request. A value of zero specifies there is no limit.
WriteParserMaxValues int
NewBucketService func(*influxdb.Source) (influxdb.BucketService, error)
NewQueryService func(*influxdb.Source) (query.ProxyQueryService, error)
WriteEventRecorder metric.EventRecorder
QueryEventRecorder metric.EventRecorder
AlgoWProxy FeatureProxyHandler
PointsWriter storage.PointsWriter
2019-04-09 10:05:42 +00:00
DeleteService influxdb.DeleteService
BackupService influxdb.BackupService
RestoreService influxdb.RestoreService
AuthorizationService influxdb.AuthorizationService
AuthorizerV1 influxdb.AuthorizerV1
2020-08-03 22:17:37 +00:00
OnboardingService influxdb.OnboardingService
DBRPService influxdb.DBRPMappingServiceV2
BucketService influxdb.BucketService
SessionService influxdb.SessionService
UserService influxdb.UserService
OrganizationService influxdb.OrganizationService
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
DashboardService influxdb.DashboardService
DashboardOperationLogService influxdb.DashboardOperationLogService
BucketOperationLogService influxdb.BucketOperationLogService
UserOperationLogService influxdb.UserOperationLogService
OrganizationOperationLogService influxdb.OrganizationOperationLogService
SourceService influxdb.SourceService
VariableService influxdb.VariableService
feat(kv): implemented key/value store with end-to-end integration tests * feat(kv:inmem:bolt): implement user service in a kv * refactor(kv): use consistent func receiver name * feat(kv): add initial basic auth service * refactor(passwords): move auth interface into own file * refactor(passwords): rename basic auth files to passwords * refactor(passwords): rename from BasicAuth to Passwords * refactor(kv): copy bolt user test into kv Co-authored-by: Michael Desa <mjdesa@gmail.com> * feat(kv): add inmem testing to kv store * fix(kv): remove extra user index initialization * feat(kv): attempt at making errors nice * fix(http): return not found error if filter is invalid * fix(http): s/platform/influxdb/ for user service * fix(http): s/platform/influxdb/ for user service * feat(kv): initial port of telegraf configs to kv * feat(kv): first pass at migrating bolt org service to kv * feat(kv): first pass at bucket service * feat(kv): first pass at migrating kvlog to kv package * feat(kv): add resource op logs * feat(kv): first pass at user resource mapping migration * feat(kv): add urm usage to bucket and org services * feat(kv): first pass at kv authz service * feat(kv): add cascading auth delete for users * feat(kv): first pass d authorizer.OrganizationService in kv * feat(cmd/influxd/launcher): user kv services where appropriate * fix(kv): initialize authorizations * fix(influxdb): use same buckets while slowly migrating stuff * fix(kv): make staticcheck pass * feat(kv): add dashboards to kv review: make suggestions from pr review fix: use common bucket names for bolt/kv stores * test(kv): add complete password test coverage * chore(kv): fixes for staticcheck * feat(kv): implement labels generically on kv * feat(kv): implement macro service * feat(kv): add source service * feat(kv): add session service * feat(kv): add kv secret service * refactor(kv): update telegraf and urm with error messages * feat(kv): add lookup service * feat(kv): add kv onboarding service * refactor(kv): update telegraf to avoid repetition * feat(cmd/influxd): use kv lookup service * feat(kv): add telegraf to lookup service * feat(cmd/influxd): use kv telegraf service * feat(kv): initial port of scrapers in bolt to kv * feat(kv): update scraper error messaging * feat(cmd/influxd): add kv scraper * feat(kv): add inmem backend tests * refactor(kv): copy paste errors * refactor(kv): add code to password errors * fix(testing): update error messages for incorrect passwords * feat(kv:inmem:bolt): implement user service in a kv * refactor(kv): use consistent func receiver name * refactor(kv): copy bolt user test into kv Co-authored-by: Michael Desa <mjdesa@gmail.com> * feat(kv): add inmem testing to kv store * fix(kv): remove extra user index initialization * feat(kv): attempt at making errors nice * fix(http): return not found error if filter is invalid * fix(http): s/platform/influxdb/ for user service * feat(kv): first pass at migrating bolt org service to kv * feat(kv): first pass at bucket service * feat(kv): first pass at migrating kvlog to kv package * feat(kv): add resource op logs * feat(kv): first pass at user resource mapping migration * feat(kv): add urm usage to bucket and org services * feat(kv): first pass at kv authz service * feat(kv): add cascading auth delete for users * feat(kv): first pass d authorizer.OrganizationService in kv * feat(cmd/influxd/launcher): user kv services where appropriate * feat(kv): add initial basic auth service * refactor(passwords): move auth interface into own file * refactor(passwords): rename basic auth files to passwords * fix(http): s/platform/influxdb/ for user service * fix(kv): initialize authorizations * fix(influxdb): use same buckets while slowly migrating stuff * fix(kv): make staticcheck pass * feat(kv): add dashboards to kv review: make suggestions from pr review fix: use common bucket names for bolt/kv stores * feat(kv): implement labels generically on kv * refactor(passwords): rename from BasicAuth to Passwords * test(kv): add complete password test coverage * chore(kv): fixes for staticcheck * feat(kv): implement macro service * feat(kv): add source service * feat(kv): add session service * feat(kv): initial port of telegraf configs to kv * feat(kv): initial port of scrapers in bolt to kv * feat(kv): add kv secret service * refactor(kv): update telegraf and urm with error messages * feat(kv): add lookup service * feat(kv): add kv onboarding service * refactor(kv): update telegraf to avoid repetition * feat(cmd/influxd): use kv lookup service * feat(kv): add telegraf to lookup service * feat(cmd/influxd): use kv telegraf service * feat(kv): update scraper error messaging * feat(cmd/influxd): add kv scraper * feat(kv): add inmem backend tests * refactor(kv): copy paste errors * refactor(kv): add code to password errors * fix(testing): update error messages for incorrect passwords * feat(http): initial support for flushing all key/values from kv store * feat(kv): rename macro to variable * feat(cmd/influxd/launcher): user kv services where appropriate * refactor(passwords): rename from BasicAuth to Passwords * feat(kv): implement macro service * test(ui): introduce cypress * test(ui): introduce first typescript test * test(ui/e2e): add ci job * chore: update gitignore to ignore test outputs * feat(inmem): in memory influxdb * test(e2e): adding pinger that checks if influxdb is alive * hackathon * hack * hack * hack * hack * Revert "feat(inmem): in memory influxdb" This reverts commit 30ddf032003e704643b07ce80df61c3299ea7295. * hack * hack * hack * hack * hack * hack * hack * hack * hack * hack * hack * hack * hack * chore: lint ignore node_modules * hack * hack * hack * add user and flush * hack * remove unused vars * hack * hack * ci(circle): prefix e2e artifacts * change test to testid * update cypress * moar testid * fix npm warnings * remove absolte path * chore(ci): remove /home/circleci proto mkdir hack * wip: crud resources e2e * fix(inmem): use inmem kv store services * test(dashboard): add first dashboard crud tests * hack * undo hack * fix: use response from setup for orgID * chore: wip * add convenience getByTitle function * test(e2e): ui can create orgs * test(e2e): add test for org deletion and update * test(e2e): introduce task creation test * test(e2e): create and update of buckets on org view * chore: move types to declaration file * chore: use route fixture in dashboard tests * chore(ci): hack back * test(ui): update snapshots * chore: package-lock * chore: remove macros * fix: launcher rebase issues * fix: compile errors * fix: compile errors * feat(cmd/influxdb): add explicit testing, asset-path, and store flags Co-authored-by: Andrew Watkins <watts@influxdb.com> * fix(cmd/influxd): set default HTTP handler and flags Co-authored-by: Andrew Watkins <watts@influxdb.com> * build(Makefile): add run-e2e and PHONY * feat(kv:inmem:bolt): implement user service in a kv * refactor(kv): use consistent func receiver name * feat(kv): add initial basic auth service * refactor(passwords): move auth interface into own file * refactor(passwords): rename basic auth files to passwords * refactor(passwords): rename from BasicAuth to Passwords * refactor(kv): copy bolt user test into kv Co-authored-by: Michael Desa <mjdesa@gmail.com> * feat(kv): add inmem testing to kv store * fix(kv): remove extra user index initialization * feat(kv): attempt at making errors nice * fix(http): return not found error if filter is invalid * fix(http): s/platform/influxdb/ for user service * fix(http): s/platform/influxdb/ for user service * feat(kv): initial port of telegraf configs to kv * feat(kv): initial port of scrapers in bolt to kv * feat(kv): first pass at migrating bolt org service to kv * feat(kv): first pass at bucket service * feat(kv): first pass at migrating kvlog to kv package * feat(kv): add resource op logs * feat(kv): first pass at user resource mapping migration * feat(kv): add urm usage to bucket and org services * feat(kv): first pass at kv authz service * feat(kv): add cascading auth delete for users * feat(kv): first pass d authorizer.OrganizationService in kv * feat(cmd/influxd/launcher): user kv services where appropriate * fix(kv): initialize authorizations * fix(influxdb): use same buckets while slowly migrating stuff * fix(kv): make staticcheck pass * feat(kv): add dashboards to kv review: make suggestions from pr review fix: use common bucket names for bolt/kv stores * test(kv): add complete password test coverage * chore(kv): fixes for staticcheck * feat(kv): implement labels generically on kv * feat(kv): implement macro service * feat(kv): add source service * feat(kv): add session service * feat(kv): add kv secret service * refactor(kv): update telegraf and urm with error messages * feat(kv): add lookup service * feat(kv): add kv onboarding service * refactor(kv): update telegraf to avoid repetition * feat(cmd/influxd): use kv lookup service * feat(kv): add telegraf to lookup service * feat(cmd/influxd): use kv telegraf service * feat(kv): update scraper error messaging * feat(cmd/influxd): add kv scraper * feat(kv): add inmem backend tests * refactor(kv): copy paste errors * refactor(kv): add code to password errors * fix(testing): update error messages for incorrect passwords * feat(kv): rename macro to variable * refactor(kv): auth/bucket/org/user unique checks return errors now * feat(inmem): add way to get all bucket names from store * feat(inmem): Buckets to return slice of bytes rather than strings * feat(inmem): add locks around Buckets to avoid races * feat(cmd/influx): check for unauthorized error in wrapCheckSetup * chore(e2e): add video and screenshot artifcats to gitignore * docs(ci): add build instructions for e2e tests * feat(kv): add id lookup for authorized resources
2019-02-19 23:47:19 +00:00
PasswordsService influxdb.PasswordsService
InfluxQLService query.ProxyQueryService
InfluxqldService influxql.ProxyQueryService
FluxService query.ProxyQueryService
FluxLanguageService influxdb.FluxLanguageService
TaskService influxdb.TaskService
2019-07-25 10:30:25 +00:00
CheckService influxdb.CheckService
TelegrafService influxdb.TelegrafConfigStore
ScraperTargetStoreService influxdb.ScraperTargetStoreService
SecretService influxdb.SecretService
LookupService influxdb.LookupService
ChronografService *server.Service
OrgLookupService authorizer.OrgIDResolver
DocumentService influxdb.DocumentService
NotificationRuleStore influxdb.NotificationRuleStore
2019-08-08 21:59:03 +00:00
NotificationEndpointService influxdb.NotificationEndpointService
Flagger feature.Flagger
FlagsHandler http.Handler
}
// PrometheusCollectors exposes the prometheus collectors associated with an APIBackend.
func (b *APIBackend) PrometheusCollectors() []prometheus.Collector {
var cs []prometheus.Collector
if pc, ok := b.WriteEventRecorder.(prom.PrometheusCollector); ok {
cs = append(cs, pc.PrometheusCollectors()...)
}
if pc, ok := b.QueryEventRecorder.(prom.PrometheusCollector); ok {
cs = append(cs, pc.PrometheusCollectors()...)
}
return cs
}
// APIHandlerOptFn is a functional input param to set parameters on
// the APIHandler.
type APIHandlerOptFn func(chi.Router)
// WithResourceHandler registers a resource handler on the APIHandler.
func WithResourceHandler(resHandler kithttp.ResourceHandler) APIHandlerOptFn {
return func(h chi.Router) {
h.Mount(resHandler.Prefix(), resHandler)
}
}
// NewAPIHandler constructs all api handlers beneath it and returns an APIHandler
func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler {
h := &APIHandler{
Router: NewBaseChiRouter(kithttp.NewAPI(kithttp.WithLog(b.Logger))),
}
b.UserResourceMappingService = authorizer.NewURMService(b.OrgLookupService, b.UserResourceMappingService)
h.Mount("/api/v2", serveLinksHandler(b.HTTPErrorHandler))
checkBackend := NewCheckBackend(b.Logger.With(zap.String("handler", "check")), b)
checkBackend.CheckService = authorizer.NewCheckService(b.CheckService,
b.UserResourceMappingService, b.OrganizationService)
h.Mount(prefixChecks, NewCheckHandler(b.Logger, checkBackend))
h.Mount(prefixChronograf, NewChronografHandler(b.ChronografService, b.HTTPErrorHandler))
deleteBackend := NewDeleteBackend(b.Logger.With(zap.String("handler", "delete")), b)
h.Mount(prefixDelete, NewDeleteHandler(b.Logger, deleteBackend))
documentBackend := NewDocumentBackend(b.Logger.With(zap.String("handler", "document")), b)
documentBackend.DocumentService = authorizer.NewDocumentService(b.DocumentService)
h.Mount(prefixDocuments, NewDocumentHandler(documentBackend))
fluxBackend := NewFluxBackend(b.Logger.With(zap.String("handler", "query")), b)
h.Mount(prefixQuery, NewFluxHandler(b.Logger, fluxBackend))
notificationEndpointBackend := NewNotificationEndpointBackend(b.Logger.With(zap.String("handler", "notificationEndpoint")), b)
notificationEndpointBackend.NotificationEndpointService = authorizer.NewNotificationEndpointService(b.NotificationEndpointService,
b.UserResourceMappingService, b.OrganizationService)
h.Mount(prefixNotificationEndpoints, NewNotificationEndpointHandler(notificationEndpointBackend.Logger(), notificationEndpointBackend))
notificationRuleBackend := NewNotificationRuleBackend(b.Logger.With(zap.String("handler", "notification_rule")), b)
notificationRuleBackend.NotificationRuleStore = authorizer.NewNotificationRuleStore(b.NotificationRuleStore,
b.UserResourceMappingService, b.OrganizationService)
h.Mount(prefixNotificationRules, NewNotificationRuleHandler(b.Logger, notificationRuleBackend))
scraperBackend := NewScraperBackend(b.Logger.With(zap.String("handler", "scraper")), b)
2019-04-12 16:45:48 +00:00
scraperBackend.ScraperStorageService = authorizer.NewScraperTargetStoreService(b.ScraperTargetStoreService,
b.UserResourceMappingService,
b.OrganizationService)
h.Mount(prefixTargets, NewScraperHandler(b.Logger, scraperBackend))
sourceBackend := NewSourceBackend(b.Logger.With(zap.String("handler", "source")), b)
sourceBackend.SourceService = authorizer.NewSourceService(b.SourceService)
sourceBackend.BucketService = authorizer.NewBucketService(b.BucketService)
h.Mount(prefixSources, NewSourceHandler(b.Logger, sourceBackend))
h.Mount("/api/v2/swagger.json", newSwaggerLoader(b.Logger.With(zap.String("service", "swagger-loader")), b.HTTPErrorHandler))
taskLogger := b.Logger.With(zap.String("handler", "bucket"))
taskBackend := NewTaskBackend(taskLogger, b)
taskBackend.TaskService = authorizer.NewTaskService(taskLogger, b.TaskService)
taskHandler := NewTaskHandler(b.Logger, taskBackend)
h.Mount(prefixTasks, taskHandler)
telegrafBackend := NewTelegrafBackend(b.Logger.With(zap.String("handler", "telegraf")), b)
telegrafBackend.TelegrafService = authorizer.NewTelegrafConfigService(b.TelegrafService, b.UserResourceMappingService)
h.Mount(prefixTelegrafPlugins, NewTelegrafHandler(b.Logger, telegrafBackend))
h.Mount(prefixTelegraf, NewTelegrafHandler(b.Logger, telegrafBackend))
2018-10-24 15:13:30 +00:00
h.Mount("/api/v2/flags", b.FlagsHandler)
variableBackend := NewVariableBackend(b.Logger.With(zap.String("handler", "variable")), b)
variableBackend.VariableService = authorizer.NewVariableService(b.VariableService)
h.Mount(prefixVariables, NewVariableHandler(b.Logger, variableBackend))
2019-07-25 10:30:25 +00:00
backupBackend := NewBackupBackend(b)
backupBackend.BackupService = authorizer.NewBackupService(backupBackend.BackupService)
h.Mount(prefixBackup, NewBackupHandler(backupBackend))
restoreBackend := NewRestoreBackend(b)
restoreBackend.RestoreService = authorizer.NewRestoreService(restoreBackend.RestoreService)
h.Mount(prefixRestore, NewRestoreHandler(restoreBackend))
h.Mount(dbrp.PrefixDBRP, dbrp.NewHTTPHandler(b.Logger, b.DBRPService, b.OrganizationService))
writeBackend := NewWriteBackend(b.Logger.With(zap.String("handler", "write")), b)
h.Mount(prefixWrite, NewWriteHandler(b.Logger, writeBackend,
WithMaxBatchSizeBytes(b.MaxBatchSizeBytes),
2020-07-23 17:28:57 +00:00
//WithParserOptions(
// models.WithParserMaxBytes(b.WriteParserMaxBytes),
// models.WithParserMaxLines(b.WriteParserMaxLines),
// models.WithParserMaxValues(b.WriteParserMaxValues),
//),
))
for _, o := range opts {
o(h)
}
return h
}
var apiLinks = map[string]interface{}{
// when adding new links, please take care to keep this list alphabetical
// as this makes it easier to verify values against the swagger document.
"authorizations": "/api/v2/authorizations",
"backup": "/api/v2/backup",
"buckets": "/api/v2/buckets",
"dashboards": "/api/v2/dashboards",
"external": map[string]string{
"statusFeed": "https://www.influxdata.com/feed/json",
},
"flags": "/api/v2/flags",
2019-08-08 21:59:03 +00:00
"labels": "/api/v2/labels",
"variables": "/api/v2/variables",
"me": "/api/v2/me",
"notificationRules": "/api/v2/notificationRules",
"notificationEndpoints": "/api/v2/notificationEndpoints",
"orgs": "/api/v2/orgs",
"query": map[string]string{
"self": "/api/v2/query",
"ast": "/api/v2/query/ast",
"analyze": "/api/v2/query/analyze",
"suggestions": "/api/v2/query/suggestions",
},
"restore": "/api/v2/restore",
2019-01-18 15:38:28 +00:00
"setup": "/api/v2/setup",
"signin": "/api/v2/signin",
"signout": "/api/v2/signout",
"sources": "/api/v2/sources",
"scrapers": "/api/v2/scrapers",
"swagger": "/api/v2/swagger.json",
"system": map[string]string{
"metrics": "/metrics",
"debug": "/debug/pprof",
"health": "/health",
},
"tasks": "/api/v2/tasks",
2019-07-25 10:30:25 +00:00
"checks": "/api/v2/checks",
"telegrafs": "/api/v2/telegrafs",
"plugins": "/api/v2/telegraf/plugins",
"users": "/api/v2/users",
"write": "/api/v2/write",
2019-04-09 10:05:42 +00:00
"delete": "/api/v2/delete",
}
func serveLinksHandler(errorHandler influxdb.HTTPErrorHandler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if err := encodeResponse(ctx, w, http.StatusOK, apiLinks); err != nil {
errorHandler.HandleHTTPError(ctx, err, w)
}
}
return http.HandlerFunc(fn)
}