From abc20e5e05cc4df07cf12cb9b590ebd947a096ed Mon Sep 17 00:00:00 2001 From: Michael Desa Date: Wed, 1 Aug 2018 17:50:57 -0400 Subject: [PATCH] feat(platform): add chronograf language server to platform binary Co-authored-by: Andrew Watkins Co-authored-by: Michael Desa --- chronograf/server/flux.go | 110 --------------------------- chronograf/server/mux.go | 6 -- cmd/idpd/main.go | 2 + http/chronograf_handler.go | 6 -- http/flux_lang.go | 148 +++++++++++++++++++++++++++++++++++++ http/platform_handler.go | 25 ++++++- 6 files changed, 174 insertions(+), 123 deletions(-) delete mode 100644 chronograf/server/flux.go create mode 100644 http/flux_lang.go diff --git a/chronograf/server/flux.go b/chronograf/server/flux.go deleted file mode 100644 index ba839a228a..0000000000 --- a/chronograf/server/flux.go +++ /dev/null @@ -1,110 +0,0 @@ -package server - -import ( - "encoding/json" - "fmt" - "net/http" - - "github.com/bouk/httprouter" - "github.com/influxdata/platform/query/complete" - "github.com/influxdata/platform/query/parser" -) - -type Params map[string]string - -// SuggestionsResponse provides a list of available Flux functions -type SuggestionsResponse struct { - Functions []SuggestionResponse `json:"funcs"` -} - -// SuggestionResponse provides the parameters available for a given Flux function -type SuggestionResponse struct { - Name string `json:"name"` - Params Params `json:"params"` -} - -type fluxLinks struct { - Self string `json:"self"` // Self link mapping to this resource - Suggestions string `json:"suggestions"` // URL for flux builder function suggestions -} - -type fluxResponse struct { - Links fluxLinks `json:"links"` -} - -// Flux returns a list of links for the Flux API -func (s *Service) Flux(w http.ResponseWriter, r *http.Request) { - httpAPIFlux := "/chronograf/v1/flux" - res := fluxResponse{ - Links: fluxLinks{ - Self: fmt.Sprintf("%s", httpAPIFlux), - Suggestions: fmt.Sprintf("%s/suggestions", httpAPIFlux), - }, - } - - encodeJSON(w, http.StatusOK, res, s.Logger) -} - -// FluxSuggestions returns a list of available Flux functions for the Flux Builder -func (s *Service) FluxSuggestions(w http.ResponseWriter, r *http.Request) { - completer := complete.DefaultCompleter() - names := completer.FunctionNames() - var functions []SuggestionResponse - for _, name := range names { - suggestion, err := completer.FunctionSuggestion(name) - if err != nil { - Error(w, http.StatusNotFound, err.Error(), s.Logger) - return - } - - filteredParams := make(Params) - for key, value := range suggestion.Params { - if key == "table" { - continue - } - - filteredParams[key] = value - } - - functions = append(functions, SuggestionResponse{ - Name: name, - Params: filteredParams, - }) - } - res := SuggestionsResponse{Functions: functions} - - encodeJSON(w, http.StatusOK, res, s.Logger) -} - -// FluxSuggestion returns the function parameters for the requested function -func (s *Service) FluxSuggestion(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - name := httprouter.GetParamFromContext(ctx, "name") - completer := complete.DefaultCompleter() - - suggestion, err := completer.FunctionSuggestion(name) - if err != nil { - Error(w, http.StatusNotFound, err.Error(), s.Logger) - } - - encodeJSON(w, http.StatusOK, SuggestionResponse{Name: name, Params: suggestion.Params}, s.Logger) -} - -type ASTRequest struct { - Body string `json:"body"` -} - -func (s *Service) FluxAST(w http.ResponseWriter, r *http.Request) { - var request ASTRequest - err := json.NewDecoder(r.Body).Decode(&request) - if err != nil { - invalidJSON(w, s.Logger) - } - - ast, err := parser.NewAST(request.Body) - if err != nil { - Error(w, http.StatusInternalServerError, err.Error(), s.Logger) - } - - encodeJSON(w, http.StatusOK, ast, s.Logger) -} diff --git a/chronograf/server/mux.go b/chronograf/server/mux.go index 6004717c37..f24800e6a2 100644 --- a/chronograf/server/mux.go +++ b/chronograf/server/mux.go @@ -165,12 +165,6 @@ func NewMux(opts MuxOpts, service Service) http.Handler { router.DELETE("/chronograf/v1/sources/:id", EnsureEditor(service.RemoveSource)) router.GET("/chronograf/v1/sources/:id/health", EnsureViewer(service.SourceHealth)) - // Flux - router.GET("/chronograf/v1/flux", EnsureViewer(service.Flux)) - router.POST("/chronograf/v1/flux/ast", EnsureViewer(service.FluxAST)) - router.GET("/chronograf/v1/flux/suggestions", EnsureViewer(service.FluxSuggestions)) - router.GET("/chronograf/v1/flux/suggestions/:name", EnsureViewer(service.FluxSuggestion)) - // Source Proxy to Influx; Has gzip compression around the handler influx := gziphandler.GzipHandler(http.HandlerFunc(EnsureViewer(service.Influx))) router.Handler("POST", "/chronograf/v1/sources/:id/proxy", influx) diff --git a/cmd/idpd/main.go b/cmd/idpd/main.go index 78239b935f..255c6c300f 100644 --- a/cmd/idpd/main.go +++ b/cmd/idpd/main.go @@ -183,6 +183,7 @@ func platformF(cmd *cobra.Command, args []string) { authHandler.Logger = logger.With(zap.String("handler", "auth")) assetHandler := http.NewAssetHandler() + fluxLangHandler := http.NewFluxLangHandler() sourceHandler := http.NewSourceHandler() sourceHandler.SourceService = sourceSvc @@ -200,6 +201,7 @@ func platformF(cmd *cobra.Command, args []string) { AuthorizationHandler: authHandler, DashboardHandler: dashboardHandler, AssetHandler: assetHandler, + FluxLangHandler: fluxLangHandler, ChronografHandler: chronografHandler, SourceHandler: sourceHandler, TaskHandler: taskHandler, diff --git a/http/chronograf_handler.go b/http/chronograf_handler.go index 86924bed96..bcac56e833 100644 --- a/http/chronograf_handler.go +++ b/http/chronograf_handler.go @@ -45,12 +45,6 @@ func NewChronografHandler(s *server.Service) *ChronografHandler { h.HandlerFunc("DELETE", "/chronograf/v1/sources/:id", h.Service.RemoveSource) h.HandlerFunc("GET", "/chronograf/v1/sources/:id/health", h.Service.SourceHealth) - // Flux - h.HandlerFunc("GET", "/chronograf/v1/flux", h.Service.Flux) - h.HandlerFunc("POST", "/chronograf/v1/flux/ast", h.Service.FluxAST) - h.HandlerFunc("GET", "/chronograf/v1/flux/suggestions", h.Service.FluxSuggestions) - h.HandlerFunc("GET", "/chronograf/v1/flux/suggestions/:name", h.Service.FluxSuggestion) - // Source Proxy to Influx; Has gzip compression around the handler influx := gziphandler.GzipHandler(http.HandlerFunc(h.Service.Influx)) h.Handler("POST", "/chronograf/v1/sources/:id/proxy", influx) diff --git a/http/flux_lang.go b/http/flux_lang.go new file mode 100644 index 0000000000..47354cf91d --- /dev/null +++ b/http/flux_lang.go @@ -0,0 +1,148 @@ +package http + +import ( + "encoding/json" + "net/http" + + query "github.com/desa/platform/query/builtin" + "github.com/influxdata/platform/query/parser" + "github.com/julienschmidt/httprouter" +) + +// FluxLangHandler represents an HTTP API handler for buckets. +type FluxLangHandler struct { + *httprouter.Router +} + +type astRequest struct { + Body string `json:"body"` +} + +// NewFluxLangHandler returns a new instance of FluxLangHandler. +func NewFluxLangHandler() *FluxLangHandler { + h := &FluxLangHandler{ + Router: httprouter.New(), + } + + h.HandlerFunc("GET", "/v2/flux", h.getFlux) + h.HandlerFunc("POST", "/v2/flux/ast", h.postFluxAST) + h.HandlerFunc("GET", "/v2/flux/suggestions", h.getFluxSuggestions) + h.HandlerFunc("GET", "/v2/flux/suggestions/:name", h.getFluxSuggestion) + return h +} + +// fluxParams contain flux funciton parameters as defined by the semantic graph +type fluxParams map[string]string + +// suggestionsResponse provides a list of available Flux functions +type suggestionsResponse struct { + Functions []suggestionResponse `json:"funcs"` +} + +// suggestionResponse provides the parameters available for a given Flux function +type suggestionResponse struct { + Name string `json:"name"` + Params fluxParams `json:"params"` +} + +type fluxLinks struct { + Self string `json:"self"` // Self link mapping to this resource + Suggestions string `json:"suggestions"` // URL for flux builder function suggestions +} + +type fluxResponse struct { + Links fluxLinks `json:"links"` +} + +var getFluxResponse = fluxResponse{ + Links: fluxLinks{ + Self: "/v2/flux", + Suggestions: "/v2/flux/suggestions", + }, +} + +// getFlux returns a list of links for the Flux API +func (h *FluxLangHandler) getFlux(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + if err := encodeResponse(ctx, w, http.StatusOK, getFluxResponse); err != nil { + EncodeError(ctx, err, w) + return + } +} + +// postFluxAST returns a flux AST for provided flux string +func (h *FluxLangHandler) postFluxAST(w http.ResponseWriter, r *http.Request) { + var request astRequest + ctx := r.Context() + + err := json.NewDecoder(r.Body).Decode(&request) + if err != nil { + EncodeError(ctx, err, w) + return + } + + ast, err := parser.NewAST(request.Body) + if err != nil { + EncodeError(ctx, err, w) + return + } + + if err := encodeResponse(ctx, w, http.StatusOK, ast); err != nil { + EncodeError(ctx, err, w) + return + } +} + +// getFluxSuggestions returns a list of available Flux functions for the Flux Builder +func (h *FluxLangHandler) getFluxSuggestions(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + completer := query.DefaultCompleter() + names := completer.FunctionNames() + var functions []suggestionResponse + for _, name := range names { + suggestion, err := completer.FunctionSuggestion(name) + if err != nil { + EncodeError(ctx, err, w) + return + } + + filteredParams := make(fluxParams) + for key, value := range suggestion.Params { + if key == "table" { + continue + } + + filteredParams[key] = value + } + + functions = append(functions, suggestionResponse{ + Name: name, + Params: filteredParams, + }) + } + res := suggestionsResponse{Functions: functions} + + if err := encodeResponse(ctx, w, http.StatusOK, res); err != nil { + EncodeError(ctx, err, w) + return + } +} + +// getFluxSuggestion returns the function parameters for the requested function +func (h *FluxLangHandler) getFluxSuggestion(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + name := httprouter.ParamsFromContext(ctx).ByName("name") + completer := query.DefaultCompleter() + + suggestion, err := completer.FunctionSuggestion(name) + if err != nil { + EncodeError(ctx, err, w) + return + } + + res := suggestionResponse{Name: name, Params: suggestion.Params} + if err := encodeResponse(ctx, w, http.StatusOK, res); err != nil { + EncodeError(ctx, err, w) + return + } +} diff --git a/http/platform_handler.go b/http/platform_handler.go index 01f5c177c2..be4f08862d 100644 --- a/http/platform_handler.go +++ b/http/platform_handler.go @@ -2,6 +2,8 @@ package http import ( "context" + "encoding/json" + "fmt" nethttp "net/http" "strings" @@ -20,6 +22,7 @@ type PlatformHandler struct { ChronografHandler *ChronografHandler SourceHandler *SourceHandler TaskHandler *TaskHandler + FluxLangHandler *FluxLangHandler } func setCORSResponseHeaders(w nethttp.ResponseWriter, r *nethttp.Request) { @@ -32,6 +35,11 @@ func setCORSResponseHeaders(w nethttp.ResponseWriter, r *nethttp.Request) { var platformLinks = map[string]interface{}{ "sources": "/v2/sources", + "flux": map[string]string{ + "self": "/v2/flux", + "ast": "/v2/flux/ast", + "suggestions": "/v2/flux/suggestions", + }, } func (h *PlatformHandler) serveLinks(w nethttp.ResponseWriter, r *nethttp.Request) { @@ -59,11 +67,17 @@ func (h *PlatformHandler) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request return } - if r.URL.Path == "/v2" { + // Serve the links base links for the API. + if r.URL.Path == "/v2/" || r.URL.Path == "/v2" { h.serveLinks(w, r) return } + if strings.HasPrefix(r.URL.Path, "/v2/flux") { + h.FluxLangHandler.ServeHTTP(w, r) + return + } + if strings.HasPrefix(r.URL.Path, "/chronograf/") { h.ChronografHandler.ServeHTTP(w, r) return @@ -127,3 +141,12 @@ func extractAuthorization(ctx context.Context, r *nethttp.Request) (context.Cont } return idpctx.SetToken(ctx, t), nil } + +func mustMarshalJSON(i interface{}) []byte { + b, err := json.Marshal(i) + if err != nil { + panic(fmt.Sprintf("failed to marshal json: %v", err)) + } + + return b +}