refactoring the refactoring
parent
def4590fca
commit
fd626de462
|
@ -67,7 +67,7 @@ func execRun(args []string) {
|
|||
|
||||
// Start the server handler. Attach to broker if listening on the same port.
|
||||
if s != nil {
|
||||
sh := httpd.NewHandler(s, config.Authentication.Enabled)
|
||||
sh := httpd.NewHandler(s, config.Authentication.Enabled, version)
|
||||
if h != nil && config.BrokerAddr() == config.DataAddr() {
|
||||
h.serverHandler = sh
|
||||
} else {
|
||||
|
@ -211,7 +211,7 @@ func openServer(path string, u *url.URL, b *messaging.Broker, initializing, conf
|
|||
}
|
||||
|
||||
// Create and open the server.
|
||||
s := influxdb.NewServer(version)
|
||||
s := influxdb.NewServer()
|
||||
if err := s.Open(path); err != nil {
|
||||
log.Fatalf("failed to open data server: %v", err.Error())
|
||||
}
|
||||
|
|
|
@ -13,10 +13,12 @@ import (
|
|||
"github.com/influxdb/influxdb"
|
||||
)
|
||||
|
||||
// basicAuthCredentials returns the username and password encoded in
|
||||
// parseCredentials returns the username and password encoded in
|
||||
// a request. The credentials may be present as URL query params, or as
|
||||
// a Basic Authentication header.
|
||||
func basicAuthCredentials(r *http.Request) (string, string, error) {
|
||||
// as params: http://127.0.0.1/query?u=username&p=password
|
||||
// as basic auth: http://username:password@127.0.0.1
|
||||
func parseCredentials(r *http.Request) (string, string, error) {
|
||||
q := r.URL.Query()
|
||||
username, password := q.Get("u"), q.Get("p")
|
||||
if username != "" && password != "" {
|
||||
|
@ -41,18 +43,18 @@ func basicAuthCredentials(r *http.Request) (string, string, error) {
|
|||
return fields[0], fields[1], nil
|
||||
}
|
||||
|
||||
// authorize ensures that if user credentials are passed in, an attempt is made to authenticate that user.
|
||||
// If authentication fails, an error is returned to the user.
|
||||
// authorize wraps a handler and ensures that if user credentials are passed in
|
||||
// an attempt is made to authenticate that user. If authentication fails, an error is returned.
|
||||
//
|
||||
// There is one exception: if there are no users in the system, authentication is not required. This
|
||||
// is to facilitate bootstrapping of a system with authentication enabled.
|
||||
func authorize(inner http.Handler, h *Handler, requireAuthentication bool) http.Handler {
|
||||
func authorize(inner func(http.ResponseWriter, *http.Request, *influxdb.User), h *Handler, requireAuthentication bool) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
var user *influxdb.User
|
||||
|
||||
// TODO corylanou: never allow this in the future without users
|
||||
if requireAuthentication && h.server.UserCount() > 0 {
|
||||
username, password, err := basicAuthCredentials(r)
|
||||
username, password, err := parseCredentials(r)
|
||||
if err != nil {
|
||||
httpError(w, err.Error(), http.StatusUnauthorized)
|
||||
return
|
||||
|
@ -68,7 +70,13 @@ func authorize(inner http.Handler, h *Handler, requireAuthentication bool) http.
|
|||
return
|
||||
}
|
||||
}
|
||||
h.user = user
|
||||
inner(w, r, user)
|
||||
})
|
||||
}
|
||||
|
||||
func versionHeader(inner http.Handler, version string) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Add("X-Influxdb-Version", version)
|
||||
inner.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ type route struct {
|
|||
name string
|
||||
method string
|
||||
pattern string
|
||||
handlerFunc http.HandlerFunc
|
||||
handlerFunc func(http.ResponseWriter, *http.Request, *influxdb.User)
|
||||
}
|
||||
|
||||
// Handler represents an HTTP handler for the InfluxDB server.
|
||||
|
@ -34,12 +34,11 @@ type Handler struct {
|
|||
server *influxdb.Server
|
||||
routes []route
|
||||
mux *pat.PatternServeMux
|
||||
user *influxdb.User
|
||||
requireAuthentication bool
|
||||
}
|
||||
|
||||
// NewHandler returns a new instance of Handler.
|
||||
func NewHandler(s *influxdb.Server, requireAuthentication bool) *Handler {
|
||||
func NewHandler(s *influxdb.Server, requireAuthentication bool, version string) *Handler {
|
||||
h := &Handler{
|
||||
server: s,
|
||||
mux: pat.New(),
|
||||
|
@ -80,10 +79,9 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool) *Handler {
|
|||
)
|
||||
|
||||
for _, r := range h.routes {
|
||||
var handler http.Handler
|
||||
|
||||
handler = r.handlerFunc
|
||||
handler = authorize(handler, h, requireAuthentication)
|
||||
handler := authorize(r.handlerFunc, h, requireAuthentication)
|
||||
handler = versionHeader(handler, version)
|
||||
handler = cors(handler)
|
||||
handler = requestID(handler)
|
||||
handler = logging(handler, r.name, weblog)
|
||||
|
@ -97,12 +95,11 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool) *Handler {
|
|||
|
||||
//ServeHTTP responds to HTTP request to the handler.
|
||||
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Add("X-Influxdb-Version", h.server.Version())
|
||||
h.mux.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
// serveQuery parses an incoming query and, if valid, executes the query.
|
||||
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request) {
|
||||
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
|
||||
q := r.URL.Query()
|
||||
p := influxql.NewParser(strings.NewReader(q.Get("q")))
|
||||
db := q.Get("db")
|
||||
|
@ -125,7 +122,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
// Execute query. One result will return for each statement.
|
||||
results := h.server.ExecuteQuery(query, db, h.user)
|
||||
results := h.server.ExecuteQuery(query, db, user)
|
||||
|
||||
// Send results to client.
|
||||
httpResults(w, results)
|
||||
|
@ -140,7 +137,7 @@ type batchWrite struct {
|
|||
}
|
||||
|
||||
// serveWrite receives incoming series data and writes it to the database.
|
||||
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request) {
|
||||
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
|
||||
var br batchWrite
|
||||
|
||||
dec := json.NewDecoder(r.Body)
|
||||
|
@ -173,8 +170,8 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if h.requireAuthentication && !h.user.Authorize(influxql.WritePrivilege, br.Database) {
|
||||
writeError(influxdb.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", h.user.Name)}, http.StatusUnauthorized)
|
||||
if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, br.Database) {
|
||||
writeError(influxdb.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name)}, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -198,7 +195,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
// serveMetastore returns a copy of the metastore.
|
||||
func (h *Handler) serveMetastore(w http.ResponseWriter, r *http.Request) {
|
||||
func (h *Handler) serveMetastore(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
|
||||
// Set headers.
|
||||
w.Header().Set("Content-Type", "application/octet-stream")
|
||||
w.Header().Set("Content-Disposition", `attachment; filename="meta"`)
|
||||
|
@ -209,12 +206,12 @@ func (h *Handler) serveMetastore(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
// servePing returns a simple response to let the client know the server is running.
|
||||
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
|
||||
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
// serveDataNodes returns a list of all data nodes in the cluster.
|
||||
func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request) {
|
||||
func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
|
||||
// Generate a list of objects for encoding to the API.
|
||||
a := make([]*dataNodeJSON, 0)
|
||||
for _, n := range h.server.DataNodes() {
|
||||
|
@ -229,7 +226,7 @@ func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
// serveCreateDataNode creates a new data node in the cluster.
|
||||
func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request) {
|
||||
func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
|
||||
// Read in data node from request body.
|
||||
var n dataNodeJSON
|
||||
if err := json.NewDecoder(r.Body).Decode(&n); err != nil {
|
||||
|
@ -269,7 +266,7 @@ func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
// serveDeleteDataNode removes an existing node.
|
||||
func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request) {
|
||||
func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
|
||||
// Parse node id.
|
||||
nodeID, err := strconv.ParseUint(r.URL.Query().Get(":id"), 10, 64)
|
||||
if err != nil {
|
||||
|
|
|
@ -787,12 +787,12 @@ type HTTPServer struct {
|
|||
}
|
||||
|
||||
func NewHTTPServer(s *Server) *HTTPServer {
|
||||
h := httpd.NewHandler(s.Server, false)
|
||||
h := httpd.NewHandler(s.Server, false, "X.X")
|
||||
return &HTTPServer{httptest.NewServer(h), h}
|
||||
}
|
||||
|
||||
func NewAuthenticatedHTTPServer(s *Server) *HTTPServer {
|
||||
h := httpd.NewHandler(s.Server, true)
|
||||
h := httpd.NewHandler(s.Server, true, "X.X")
|
||||
return &HTTPServer{httptest.NewServer(h), h}
|
||||
}
|
||||
|
||||
|
@ -807,7 +807,7 @@ type Server struct {
|
|||
|
||||
// NewServer returns a new test server instance.
|
||||
func NewServer() *Server {
|
||||
return &Server{influxdb.NewServer("X.X")}
|
||||
return &Server{influxdb.NewServer()}
|
||||
}
|
||||
|
||||
// OpenServer returns a new, open test server instance.
|
||||
|
|
|
@ -66,6 +66,16 @@ func buildLogLine(l *responseLogger, r *http.Request, start time.Time) string {
|
|||
|
||||
uri := r.URL.RequestURI()
|
||||
|
||||
referer := r.Referer()
|
||||
if referer == "" {
|
||||
referer = "-"
|
||||
}
|
||||
|
||||
userAgent := r.UserAgent()
|
||||
if userAgent == "" {
|
||||
userAgent = "-"
|
||||
}
|
||||
|
||||
fields := []string{
|
||||
host,
|
||||
"-",
|
||||
|
@ -76,8 +86,8 @@ func buildLogLine(l *responseLogger, r *http.Request, start time.Time) string {
|
|||
r.Proto,
|
||||
strconv.Itoa(l.Status()),
|
||||
strconv.Itoa(l.Size()),
|
||||
r.Referer(),
|
||||
r.UserAgent(),
|
||||
referer,
|
||||
userAgent,
|
||||
r.Header.Get("Request-Id"),
|
||||
}
|
||||
|
||||
|
|
10
server.go
10
server.go
|
@ -92,12 +92,10 @@ type Server struct {
|
|||
databases map[string]*database // databases by name
|
||||
shards map[uint64]*Shard // shards by id
|
||||
users map[string]*User // user by name
|
||||
|
||||
version string // current version of the server
|
||||
}
|
||||
|
||||
// NewServer returns a new instance of Server.
|
||||
func NewServer(version string) *Server {
|
||||
func NewServer() *Server {
|
||||
return &Server{
|
||||
meta: &metastore{},
|
||||
errors: make(map[uint64]error),
|
||||
|
@ -105,7 +103,6 @@ func NewServer(version string) *Server {
|
|||
databases: make(map[string]*database),
|
||||
shards: make(map[uint64]*Shard),
|
||||
users: make(map[string]*User),
|
||||
version: version,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -117,11 +114,6 @@ func (s *Server) ID() uint64 {
|
|||
return s.id
|
||||
}
|
||||
|
||||
// Version returns the current version of the server
|
||||
func (s *Server) Version() string {
|
||||
return s.version
|
||||
}
|
||||
|
||||
// Path returns the path used when opening the server.
|
||||
// Returns an empty string when the server is closed.
|
||||
func (s *Server) Path() string {
|
||||
|
|
|
@ -907,7 +907,7 @@ type Server struct {
|
|||
|
||||
// NewServer returns a new test server instance.
|
||||
func NewServer() *Server {
|
||||
return &Server{influxdb.NewServer("X.X")}
|
||||
return &Server{influxdb.NewServer()}
|
||||
}
|
||||
|
||||
// OpenServer returns a new, open test server instance.
|
||||
|
|
Loading…
Reference in New Issue