174 lines
4.7 KiB
Go
174 lines
4.7 KiB
Go
package main
|
|
|
|
import (
|
|
"log"
|
|
"math/rand"
|
|
"net/http"
|
|
"net/http/pprof"
|
|
"net/url"
|
|
"strings"
|
|
|
|
"github.com/influxdb/influxdb"
|
|
"github.com/influxdb/influxdb/httpd"
|
|
"github.com/influxdb/influxdb/messaging"
|
|
"github.com/influxdb/influxdb/raft"
|
|
)
|
|
|
|
// Handler represents an HTTP handler for InfluxDB node.
|
|
// Depending on its role, it will serve many different endpoints.
|
|
type Handler struct {
|
|
Log *raft.Log
|
|
Broker *influxdb.Broker
|
|
Server *influxdb.Server
|
|
Config *Config
|
|
}
|
|
|
|
// NewHandler returns a new instance of Handler.
|
|
func NewHandler() *Handler {
|
|
return &Handler{}
|
|
}
|
|
|
|
// ServeHTTP responds to HTTP request to the handler.
|
|
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
// Debug routes.
|
|
if h.Config.Debugging.PprofEnabled && strings.HasPrefix(r.URL.Path, "/debug/pprof") {
|
|
switch r.URL.Path {
|
|
case "/debug/pprof/cmdline":
|
|
pprof.Cmdline(w, r)
|
|
case "/debug/pprof/profile":
|
|
pprof.Profile(w, r)
|
|
case "/debug/pprof/symbol":
|
|
pprof.Symbol(w, r)
|
|
default:
|
|
pprof.Index(w, r)
|
|
}
|
|
return
|
|
}
|
|
|
|
// FIXME: This is very brittle. Refactor to have common path prefix
|
|
if strings.HasPrefix(r.URL.Path, "/raft") {
|
|
h.serveRaft(w, r)
|
|
return
|
|
}
|
|
|
|
if strings.HasPrefix(r.URL.Path, "/messaging") {
|
|
h.serveMessaging(w, r)
|
|
return
|
|
}
|
|
|
|
if strings.HasPrefix(r.URL.Path, "/data_nodes") ||
|
|
strings.HasPrefix(r.URL.Path, "/process_continuous_queries") ||
|
|
strings.HasPrefix(r.URL.Path, "/run_mapper") ||
|
|
strings.HasPrefix(r.URL.Path, "/metastore") {
|
|
h.serveMetadata(w, r)
|
|
return
|
|
}
|
|
h.serveData(w, r)
|
|
}
|
|
|
|
// serveMessaging responds to broker requests
|
|
func (h *Handler) serveMessaging(w http.ResponseWriter, r *http.Request) {
|
|
if h.Broker == nil && h.Server == nil {
|
|
log.Println("no broker or server configured to handle messaging endpoints")
|
|
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
// If we're running a broker, handle the broker endpoints
|
|
if h.Broker != nil {
|
|
mh := &messaging.Handler{
|
|
Broker: h.Broker.Broker,
|
|
RaftHandler: &raft.Handler{Log: h.Log},
|
|
}
|
|
mh.ServeHTTP(w, r)
|
|
return
|
|
}
|
|
|
|
// Redirect to a valid broker to handle the request
|
|
h.redirect(h.Server.BrokerURLs(), w, r)
|
|
}
|
|
|
|
// serveMetadata responds to broker requests
|
|
func (h *Handler) serveMetadata(w http.ResponseWriter, r *http.Request) {
|
|
if h.Broker == nil && h.Server == nil {
|
|
log.Println("no broker or server configured to handle metadata endpoints")
|
|
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
if h.Server != nil {
|
|
sh := httpd.NewClusterHandler(h.Server, h.Config.Authentication.Enabled, version)
|
|
sh.WriteTrace = h.Config.Logging.WriteTracing
|
|
sh.ServeHTTP(w, r)
|
|
return
|
|
}
|
|
|
|
t := h.Broker.Topic(influxdb.BroadcastTopicID)
|
|
if t == nil {
|
|
http.Error(w, "not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// Redirect to a valid data URL to handle the request
|
|
h.redirect(h.Broker.Topic(influxdb.BroadcastTopicID).DataURLs(), w, r)
|
|
}
|
|
|
|
// serveRaft responds to raft requests.
|
|
func (h *Handler) serveRaft(w http.ResponseWriter, r *http.Request) {
|
|
if h.Log == nil && h.Server == nil {
|
|
log.Println("no broker or server configured to handle raft endpoints")
|
|
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
if h.Log != nil {
|
|
rh := raft.Handler{Log: h.Log}
|
|
rh.ServeHTTP(w, r)
|
|
return
|
|
}
|
|
|
|
// Redirect to a valid broker to handle the request
|
|
h.redirect(h.Server.BrokerURLs(), w, r)
|
|
}
|
|
|
|
// serveData responds to data requests
|
|
func (h *Handler) serveData(w http.ResponseWriter, r *http.Request) {
|
|
if h.Broker == nil && h.Server == nil {
|
|
log.Println("no broker or server configured to handle data endpoints")
|
|
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
if h.Server != nil {
|
|
sh := httpd.NewAPIHandler(h.Server, h.Config.Authentication.Enabled, version)
|
|
sh.WriteTrace = h.Config.Logging.WriteTracing
|
|
sh.ServeHTTP(w, r)
|
|
return
|
|
}
|
|
|
|
t := h.Broker.Topic(influxdb.BroadcastTopicID)
|
|
if t == nil {
|
|
http.Error(w, "not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// Redirect to a valid data URL to handle the request
|
|
h.redirect(h.Broker.Topic(influxdb.BroadcastTopicID).DataURLs(), w, r)
|
|
}
|
|
|
|
// redirect redirects a request to URL in u. If u is an empty slice,
|
|
// a 503 is returned
|
|
func (h *Handler) redirect(u []url.URL, w http.ResponseWriter, r *http.Request) {
|
|
// No valid URLs, return an error
|
|
if len(u) == 0 {
|
|
http.Error(w, "service unavailable", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
// TODO: Log to internal stats to track how frequently this is happening. If
|
|
// this is happening frequently, the clients are using a suboptimal endpoint
|
|
|
|
// Redirect the client to a valid data node that can handle the request
|
|
http.Redirect(w, r, u[rand.Intn(len(u))].String()+r.RequestURI, http.StatusTemporaryRedirect)
|
|
}
|