diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 128a523ac7..e20d79afc0 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -14,6 +14,7 @@ import ( "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/collectd" "github.com/influxdb/influxdb/graphite" + "github.com/influxdb/influxdb/httpd" "github.com/influxdb/influxdb/messaging" ) @@ -41,6 +42,8 @@ func execRun(args []string) { // Print sweet InfluxDB logo and write the process id to file. log.Print(logo) + log.SetPrefix(`[srvr] `) + log.SetFlags(log.LstdFlags) writePIDFile(*pidPath) // Parse the configuration and determine if a broker and/or server exist. @@ -64,8 +67,7 @@ func execRun(args []string) { // Start the server handler. Attach to broker if listening on the same port. if s != nil { - sh := influxdb.NewHandler(s) - sh.AuthenticationEnabled = config.Authentication.Enabled + sh := httpd.NewHandler(s, config.Authentication.Enabled, version) if h != nil && config.BrokerAddr() == config.DataAddr() { h.serverHandler = sh } else { diff --git a/handler.go b/httpd/handler.go similarity index 51% rename from handler.go rename to httpd/handler.go index b1ebb20095..379eaf4c3e 100644 --- a/handler.go +++ b/httpd/handler.go @@ -1,17 +1,22 @@ -package influxdb +package httpd import ( "encoding/base64" "encoding/json" "errors" "fmt" + "log" "net/http" "net/url" + "os" "strconv" "strings" "time" + "code.google.com/p/go-uuid/uuid" + "github.com/bmizerany/pat" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/influxql" ) @@ -20,10 +25,323 @@ import ( // TODO: Check HTTP response codes: 400, 401, 403, 409. -// getUsernameAndPassword returns the username and password encoded in +type route struct { + name string + method string + pattern string + handlerFunc interface{} +} + +// Handler represents an HTTP handler for the InfluxDB server. +type Handler struct { + server *influxdb.Server + routes []route + mux *pat.PatternServeMux + requireAuthentication bool +} + +// NewHandler returns a new instance of Handler. +func NewHandler(s *influxdb.Server, requireAuthentication bool, version string) *Handler { + h := &Handler{ + server: s, + mux: pat.New(), + requireAuthentication: requireAuthentication, + } + + weblog := log.New(os.Stderr, `[http] `, 0) + + h.routes = append(h.routes, + route{ + "query", // Query serving route. + "GET", "/query", h.serveQuery, + }, + route{ + "write", // Data-ingest route. + "POST", "/write", h.serveWrite, + }, + route{ // List data nodes + "data_nodes_index", + "GET", "/data_nodes", h.serveDataNodes, + }, + route{ // Create data node + "data_nodes_create", + "POST", "/data_nodes", h.serveCreateDataNode, + }, + route{ // Delete data node + "data_nodes_delete", + "DELETE", "/data_nodes/:id", h.serveDeleteDataNode, + }, + route{ // Metastore + "metastore", + "GET", "/metastore", h.serveMetastore, + }, + route{ // Ping + "ping", + "GET", "/ping", h.servePing, + }, + ) + + for _, r := range h.routes { + var handler http.Handler + + // If it's a handler func that requires authorization, wrap it in authorization + if hf, ok := r.handlerFunc.(func(http.ResponseWriter, *http.Request, *influxdb.User)); ok { + handler = authenticate(hf, h, requireAuthentication) + } + // This is a normal handler signature and does not require authorization + if hf, ok := r.handlerFunc.(func(http.ResponseWriter, *http.Request)); ok { + handler = http.HandlerFunc(hf) + } + + handler = versionHeader(handler, version) + handler = cors(handler) + handler = requestID(handler) + handler = logging(handler, r.name, weblog) + handler = recovery(handler, r.name, weblog) // make sure recovery is always last + + h.mux.Add(r.method, r.pattern, handler) + } + + return h +} + +//ServeHTTP responds to HTTP request to the handler. +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.mux.ServeHTTP(w, r) +} + +// serveQuery parses an incoming query and, if valid, executes the query. +func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influxdb.User) { + q := r.URL.Query() + p := influxql.NewParser(strings.NewReader(q.Get("q"))) + db := q.Get("db") + + // Parse query from query string. + query, err := p.ParseQuery() + if err != nil { + httpError(w, "error parsing query: "+err.Error(), http.StatusBadRequest) + return + } + + // If authentication is enabled and there are no users yet, make sure + // the first statement is creating a new cluster admin. + if h.requireAuthentication && h.server.UserCount() == 0 { + stmt, ok := query.Statements[0].(*influxql.CreateUserStatement) + if !ok || stmt.Privilege == nil || *stmt.Privilege != influxql.AllPrivileges { + httpError(w, "must create cluster admin", http.StatusUnauthorized) + return + } + } + + // Execute query. One result will return for each statement. + results := h.server.ExecuteQuery(query, db, user) + + // Send results to client. + httpResults(w, results) +} + +type batchWrite struct { + Points []influxdb.Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp time.Time `json:"timestamp"` +} + +// serveWrite receives incoming series data and writes it to the database. +func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) { + var br batchWrite + + dec := json.NewDecoder(r.Body) + dec.UseNumber() + + var writeError = func(result influxdb.Result, statusCode int) { + w.WriteHeader(statusCode) + w.Header().Add("content-type", "application/json") + _ = json.NewEncoder(w).Encode(&result) + return + } + + for { + if err := dec.Decode(&br); err != nil { + if err.Error() == "EOF" { + w.WriteHeader(http.StatusOK) + return + } + writeError(influxdb.Result{Err: err}, http.StatusInternalServerError) + return + } + + if br.Database == "" { + writeError(influxdb.Result{Err: fmt.Errorf("database is required")}, http.StatusInternalServerError) + return + } + + if !h.server.DatabaseExists(br.Database) { + writeError(influxdb.Result{Err: fmt.Errorf("database not found: %q", br.Database)}, http.StatusNotFound) + return + } + + if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, br.Database) { + writeError(influxdb.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name)}, http.StatusUnauthorized) + return + } + + for _, p := range br.Points { + if p.Timestamp.IsZero() { + p.Timestamp = br.Timestamp + } + if len(br.Tags) > 0 { + for k, _ := range br.Tags { + if p.Tags[k] == "" { + p.Tags[k] = br.Tags[k] + } + } + } + if _, err := h.server.WriteSeries(br.Database, br.RetentionPolicy, []influxdb.Point{p}); err != nil { + writeError(influxdb.Result{Err: err}, http.StatusInternalServerError) + return + } + } + } +} + +// serveMetastore returns a copy of the metastore. +func (h *Handler) serveMetastore(w http.ResponseWriter, r *http.Request) { + // Set headers. + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Disposition", `attachment; filename="meta"`) + + if err := h.server.CopyMetastore(w); err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + } +} + +// servePing returns a simple response to let the client know the server is running. +func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) +} + +// serveDataNodes returns a list of all data nodes in the cluster. +func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request) { + // Generate a list of objects for encoding to the API. + a := make([]*dataNodeJSON, 0) + for _, n := range h.server.DataNodes() { + a = append(a, &dataNodeJSON{ + ID: n.ID, + URL: n.URL.String(), + }) + } + + w.Header().Add("content-type", "application/json") + _ = json.NewEncoder(w).Encode(a) +} + +// serveCreateDataNode creates a new data node in the cluster. +func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request) { + // Read in data node from request body. + var n dataNodeJSON + if err := json.NewDecoder(r.Body).Decode(&n); err != nil { + httpError(w, err.Error(), http.StatusBadRequest) + return + } + + // Parse the URL. + u, err := url.Parse(n.URL) + if err != nil { + httpError(w, "invalid data node url", http.StatusBadRequest) + return + } + + // Create the data node. + if err := h.server.CreateDataNode(u); err == influxdb.ErrDataNodeExists { + httpError(w, err.Error(), http.StatusConflict) + return + } else if err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + // Retrieve data node reference. + node := h.server.DataNodeByURL(u) + + // Create a new replica on the broker. + if err := h.server.Client().CreateReplica(node.ID); err != nil { + httpError(w, err.Error(), http.StatusBadGateway) + return + } + + // Write new node back to client. + w.WriteHeader(http.StatusCreated) + w.Header().Add("content-type", "application/json") + _ = json.NewEncoder(w).Encode(&dataNodeJSON{ID: node.ID, URL: node.URL.String()}) +} + +// serveDeleteDataNode removes an existing node. +func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request) { + // Parse node id. + nodeID, err := strconv.ParseUint(r.URL.Query().Get(":id"), 10, 64) + if err != nil { + httpError(w, "invalid node id", http.StatusBadRequest) + return + } + + // Delete the node. + if err := h.server.DeleteDataNode(nodeID); err == influxdb.ErrDataNodeNotFound { + httpError(w, err.Error(), http.StatusNotFound) + return + } else if err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +type dataNodeJSON struct { + ID uint64 `json:"id"` + URL string `json:"url"` +} + +func isAuthorizationError(err error) bool { + type authorize interface { + authorize() + } + _, ok := err.(authorize) + return ok +} + +// httpResult writes a Results array to the client. +func httpResults(w http.ResponseWriter, results influxdb.Results) { + if results.Error() != nil { + if isAuthorizationError(results.Error()) { + w.WriteHeader(http.StatusUnauthorized) + } else { + w.WriteHeader(http.StatusInternalServerError) + } + } + w.Header().Add("content-type", "application/json") + _ = json.NewEncoder(w).Encode(results) +} + +// httpError writes an error to the client in a standard format. +func httpError(w http.ResponseWriter, error string, code int) { + var results influxdb.Results + results = append(results, &influxdb.Result{Err: errors.New(error)}) + + w.Header().Add("content-type", "application/json") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(results) +} + +// Filters and filter helpers + +// parseCredentials returns the username and password encoded in // a request. The credentials may be present as URL query params, or as // a Basic Authentication header. -func getUsernameAndPassword(r *http.Request) (string, string, error) { +// as params: http://127.0.0.1/query?u=username&p=password +// as basic auth: http://username:password@127.0.0.1 +func parseCredentials(r *http.Request) (string, string, error) { q := r.URL.Query() username, password := q.Get("u"), q.Get("p") if username != "" && password != "" { @@ -48,72 +366,23 @@ func getUsernameAndPassword(r *http.Request) (string, string, error) { return fields[0], fields[1], nil } -// Handler represents an HTTP handler for the InfluxDB server. -type Handler struct { - server *Server - mux *pat.PatternServeMux - - // Whether endpoints require authentication. - AuthenticationEnabled bool - - // The InfluxDB verion returned by the HTTP response header. - Version string -} - -// NewHandler returns a new instance of Handler. -func NewHandler(s *Server) *Handler { - h := &Handler{ - server: s, - mux: pat.New(), - } - - // Query serving route. - h.mux.Get("/query", h.makeAuthenticationHandler(h.serveQuery)) - - // Data-ingest route. - h.mux.Post("/write", h.makeAuthenticationHandler(h.serveWrite)) - - // Data node routes. - h.mux.Get("/data_nodes", h.makeAuthenticationHandler(h.serveDataNodes)) - h.mux.Post("/data_nodes", h.makeAuthenticationHandler(h.serveCreateDataNode)) - h.mux.Del("/data_nodes/:id", h.makeAuthenticationHandler(h.serveDeleteDataNode)) - - // Utilities - h.mux.Get("/metastore", h.makeAuthenticationHandler(h.serveMetastore)) - h.mux.Get("/ping", h.makeAuthenticationHandler(h.servePing)) - - return h -} - -// ServeHTTP responds to HTTP request to the handler. -func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Access-Control-Allow-Origin", "*") - w.Header().Add("Access-Control-Max-Age", "2592000") - w.Header().Add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE") - w.Header().Add("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept") - w.Header().Add("X-Influxdb-Version", h.Version) - - // If this is a CORS OPTIONS request then send back okie-dokie. - if r.Method == "OPTIONS" { - w.WriteHeader(http.StatusOK) - return - } - - // Otherwise handle it via pat. - h.mux.ServeHTTP(w, r) -} - -// makeAuthenticationHandler takes a custom handler and returns a standard handler, ensuring that -// if user credentials are passed in, an attempt is made to authenticate that user. If authentication -// fails, an error is returned to the user. +// authenticate wraps a handler and ensures that if user credentials are passed in +// an attempt is made to authenticate that user. If authentication fails, an error is returned. // // There is one exception: if there are no users in the system, authentication is not required. This // is to facilitate bootstrapping of a system with authentication enabled. -func (h *Handler) makeAuthenticationHandler(fn func(http.ResponseWriter, *http.Request, *User)) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - var user *User - if h.AuthenticationEnabled && h.server.UserCount() > 0 { - username, password, err := getUsernameAndPassword(r) +func authenticate(inner func(http.ResponseWriter, *http.Request, *influxdb.User), h *Handler, requireAuthentication bool) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Return early if we are not authenticating + if !requireAuthentication { + inner(w, r, nil) + return + } + var user *influxdb.User + + // TODO corylanou: never allow this in the future without users + if requireAuthentication && h.server.UserCount() > 0 { + username, password, err := parseCredentials(r) if err != nil { httpError(w, err.Error(), http.StatusUnauthorized) return @@ -129,222 +398,81 @@ func (h *Handler) makeAuthenticationHandler(fn func(http.ResponseWriter, *http.R return } } - fn(w, r, user) - } + inner(w, r, user) + }) } -// serveQuery parses an incoming query and, if valid, executes the query. -func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, u *User) { - q := r.URL.Query() - p := influxql.NewParser(strings.NewReader(q.Get("q"))) - db := q.Get("db") +// versionHeader taks a HTTP handler and returns a HTTP handler +// and adds the X-INFLUXBD-VERSION header to outgoing responses. +func versionHeader(inner http.Handler, version string) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("X-Influxdb-Version", version) + inner.ServeHTTP(w, r) + }) +} - // Parse query from query string. - query, err := p.ParseQuery() - if err != nil { - httpError(w, "error parsing query: "+err.Error(), http.StatusBadRequest) - return - } +// cors responds to incoming requests and adds the appropriate cors headers +// TODO: corylanou: add the ability to configure this in our config +func cors(inner http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if origin := r.Header.Get("Origin"); origin != "" { + w.Header().Set(`Access-Control-Allow-Origin`, origin) + w.Header().Set(`Access-Control-Allow-Methods`, strings.Join([]string{ + `DELETE`, + `GET`, + `OPTIONS`, + `POST`, + `PUT`, + }, ", ")) - // If authentication is enabled and there are no users yet, make sure - // the first statement is creating a new cluster admin. - if h.AuthenticationEnabled && h.server.UserCount() == 0 { - stmt, ok := query.Statements[0].(*influxql.CreateUserStatement) - if !ok || stmt.Privilege == nil || *stmt.Privilege != influxql.AllPrivileges { - httpError(w, "must create cluster admin", http.StatusUnauthorized) - return + w.Header().Set(`Access-Control-Allow-Headers`, strings.Join([]string{ + `Accept`, + `Accept-Encoding`, + `Authorization`, + `Content-Length`, + `Content-Type`, + `X-CSRF-Token`, + `X-HTTP-Method-Override`, + }, ", ")) } - } - // Execute query. One result will return for each statement. - results := h.server.ExecuteQuery(query, db, u) - - // Send results to client. - httpResults(w, results) -} - -type batchWrite struct { - Points []Point `json:"points"` - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` - Tags map[string]string `json:"tags"` - Timestamp time.Time `json:"timestamp"` -} - -// serveWrite receives incoming series data and writes it to the database. -func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, u *User) { - var br batchWrite - - dec := json.NewDecoder(r.Body) - dec.UseNumber() - - var writeError = func(result Result, statusCode int) { - w.WriteHeader(statusCode) - w.Header().Add("content-type", "application/json") - _ = json.NewEncoder(w).Encode(&result) - return - } - - for { - if err := dec.Decode(&br); err != nil { - if err.Error() == "EOF" { - w.WriteHeader(http.StatusOK) - return - } - writeError(Result{Err: err}, http.StatusInternalServerError) + if r.Method == "OPTIONS" { return } - if br.Database == "" { - writeError(Result{Err: fmt.Errorf("database is required")}, http.StatusInternalServerError) - return + inner.ServeHTTP(w, r) + }) +} + +func requestID(inner http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + uid := uuid.NewUUID() + r.Header.Set("Request-Id", uid.String()) + w.Header().Set("Request-Id", r.Header.Get("Request-Id")) + + inner.ServeHTTP(w, r) + }) +} + +func logging(inner http.Handler, name string, weblog *log.Logger) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + l := &responseLogger{w: w} + inner.ServeHTTP(l, r) + logLine := buildLogLine(l, r, start) + weblog.Println(logLine) + }) +} + +func recovery(inner http.Handler, name string, weblog *log.Logger) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + l := &responseLogger{w: w} + inner.ServeHTTP(l, r) + if err := recover(); err != nil { + logLine := buildLogLine(l, r, start) + logLine = fmt.Sprintf(`%s [err:%s]`, logLine, err) + weblog.Println(logLine) } - - if h.server.databases[br.Database] == nil { - writeError(Result{Err: fmt.Errorf("database not found: %q", br.Database)}, http.StatusNotFound) - return - } - - // TODO corylanou: Check if user can write to specified database - //if !user_can_write(br.Database) { - //writeError(&Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", u.Name)}, http.StatusUnauthorized) - //return - //} - - for _, p := range br.Points { - if p.Timestamp.IsZero() { - p.Timestamp = br.Timestamp - } - if len(br.Tags) > 0 { - for k, _ := range br.Tags { - if p.Tags[k] == "" { - p.Tags[k] = br.Tags[k] - } - } - } - if _, err := h.server.WriteSeries(br.Database, br.RetentionPolicy, []Point{p}); err != nil { - writeError(Result{Err: err}, http.StatusInternalServerError) - return - } - } - } -} - -// serveMetastore returns a copy of the metastore. -func (h *Handler) serveMetastore(w http.ResponseWriter, r *http.Request, u *User) { - // Set headers. - w.Header().Set("Content-Type", "application/octet-stream") - w.Header().Set("Content-Disposition", `attachment; filename="meta"`) - - if err := h.server.CopyMetastore(w); err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - } -} - -// servePing returns a simple response to let the client know the server is running. -func (h *Handler) servePing(w http.ResponseWriter, r *http.Request, u *User) {} - -// serveDataNodes returns a list of all data nodes in the cluster. -func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request, u *User) { - // Generate a list of objects for encoding to the API. - a := make([]*dataNodeJSON, 0) - for _, n := range h.server.DataNodes() { - a = append(a, &dataNodeJSON{ - ID: n.ID, - URL: n.URL.String(), - }) - } - - w.Header().Add("content-type", "application/json") - _ = json.NewEncoder(w).Encode(a) -} - -// serveCreateDataNode creates a new data node in the cluster. -func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request, _ *User) { - // Read in data node from request body. - var n dataNodeJSON - if err := json.NewDecoder(r.Body).Decode(&n); err != nil { - httpError(w, err.Error(), http.StatusBadRequest) - return - } - - // Parse the URL. - u, err := url.Parse(n.URL) - if err != nil { - httpError(w, "invalid data node url", http.StatusBadRequest) - return - } - - // Create the data node. - if err := h.server.CreateDataNode(u); err == ErrDataNodeExists { - httpError(w, err.Error(), http.StatusConflict) - return - } else if err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - return - } - - // Retrieve data node reference. - node := h.server.DataNodeByURL(u) - - // Create a new replica on the broker. - if err := h.server.client.CreateReplica(node.ID); err != nil { - httpError(w, err.Error(), http.StatusBadGateway) - return - } - - // Write new node back to client. - w.WriteHeader(http.StatusCreated) - w.Header().Add("content-type", "application/json") - _ = json.NewEncoder(w).Encode(&dataNodeJSON{ID: node.ID, URL: node.URL.String()}) -} - -// serveDeleteDataNode removes an existing node. -func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request, u *User) { - // Parse node id. - nodeID, err := strconv.ParseUint(r.URL.Query().Get(":id"), 10, 64) - if err != nil { - httpError(w, "invalid node id", http.StatusBadRequest) - return - } - - // Delete the node. - if err := h.server.DeleteDataNode(nodeID); err == ErrDataNodeNotFound { - httpError(w, err.Error(), http.StatusNotFound) - return - } else if err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusNoContent) -} - -type dataNodeJSON struct { - ID uint64 `json:"id"` - URL string `json:"url"` -} - -// httpResult writes a Results array to the client. -func httpResults(w http.ResponseWriter, results Results) { - if results.Error() != nil { - if isAuthorizationError(results.Error()) { - w.WriteHeader(http.StatusUnauthorized) - } else { - w.WriteHeader(http.StatusInternalServerError) - } - } - w.Header().Add("content-type", "application/json") - _ = json.NewEncoder(w).Encode(results) -} - -// httpError writes an error to the client in a standard format. -func httpError(w http.ResponseWriter, error string, code int) { - results := Results{ - &Result{Err: errors.New(error)}, - } - - w.Header().Add("content-type", "application/json") - w.WriteHeader(code) - _ = json.NewEncoder(w).Encode(results) + }) } diff --git a/handler_test.go b/httpd/handler_test.go similarity index 87% rename from handler_test.go rename to httpd/handler_test.go index 9b92e3ca69..ca2bf3a264 100644 --- a/handler_test.go +++ b/httpd/handler_test.go @@ -1,4 +1,4 @@ -package influxdb_test +package httpd_test import ( "bytes" @@ -7,10 +7,13 @@ import ( "net/http" "net/http/httptest" "net/url" + "os" "strings" "testing" "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/httpd" + "github.com/influxdb/influxdb/messaging" ) func init() { @@ -309,7 +312,7 @@ func TestHandler_Ping(t *testing.T) { status, _ := MustHTTP("GET", s.URL+`/ping`, nil, nil, "") - if status != http.StatusOK { + if status != http.StatusNoContent { t.Fatalf("unexpected status: %d", status) } } @@ -780,20 +783,122 @@ func MustParseURL(s string) *url.URL { // Server is a test HTTP server that wraps a handler type HTTPServer struct { *httptest.Server - Handler *influxdb.Handler + Handler *httpd.Handler } func NewHTTPServer(s *Server) *HTTPServer { - h := influxdb.NewHandler(s.Server) + h := httpd.NewHandler(s.Server, false, "X.X") return &HTTPServer{httptest.NewServer(h), h} } func NewAuthenticatedHTTPServer(s *Server) *HTTPServer { - h := influxdb.NewHandler(s.Server) - h.AuthenticationEnabled = true + h := httpd.NewHandler(s.Server, true, "X.X") return &HTTPServer{httptest.NewServer(h), h} } func (s *HTTPServer) Close() { s.Server.Close() } + +// Server is a wrapping test struct for influxdb.Server. +type Server struct { + *influxdb.Server +} + +// NewServer returns a new test server instance. +func NewServer() *Server { + return &Server{influxdb.NewServer()} +} + +// OpenServer returns a new, open test server instance. +func OpenServer(client influxdb.MessagingClient) *Server { + s := OpenUninitializedServer(client) + if err := s.Initialize(&url.URL{Host: "127.0.0.1:8080"}); err != nil { + panic(err.Error()) + } + return s +} + +// OpenUninitializedServer returns a new, uninitialized, open test server instance. +func OpenUninitializedServer(client influxdb.MessagingClient) *Server { + s := NewServer() + if err := s.Open(tempfile()); err != nil { + panic(err.Error()) + } + if err := s.SetClient(client); err != nil { + panic(err.Error()) + } + return s +} + +// TODO corylanou: evaluate how much of this should be in this package +// vs. how much should be a mocked out interface +// MessagingClient represents a test client for the messaging broker. +type MessagingClient struct { + index uint64 + c chan *messaging.Message + + PublishFunc func(*messaging.Message) (uint64, error) + CreateReplicaFunc func(replicaID uint64) error + DeleteReplicaFunc func(replicaID uint64) error + SubscribeFunc func(replicaID, topicID uint64) error + UnsubscribeFunc func(replicaID, topicID uint64) error +} + +// NewMessagingClient returns a new instance of MessagingClient. +func NewMessagingClient() *MessagingClient { + c := &MessagingClient{c: make(chan *messaging.Message, 1)} + c.PublishFunc = c.send + c.CreateReplicaFunc = func(replicaID uint64) error { return nil } + c.DeleteReplicaFunc = func(replicaID uint64) error { return nil } + c.SubscribeFunc = func(replicaID, topicID uint64) error { return nil } + c.UnsubscribeFunc = func(replicaID, topicID uint64) error { return nil } + return c +} + +// Publish attaches an autoincrementing index to the message. +// This function also execute's the client's PublishFunc mock function. +func (c *MessagingClient) Publish(m *messaging.Message) (uint64, error) { + c.index++ + m.Index = c.index + return c.PublishFunc(m) +} + +// send sends the message through to the channel. +// This is the default value of PublishFunc. +func (c *MessagingClient) send(m *messaging.Message) (uint64, error) { + c.c <- m + return m.Index, nil +} + +// Creates a new replica with a given ID on the broker. +func (c *MessagingClient) CreateReplica(replicaID uint64) error { + return c.CreateReplicaFunc(replicaID) +} + +// Deletes an existing replica with a given ID from the broker. +func (c *MessagingClient) DeleteReplica(replicaID uint64) error { + return c.DeleteReplicaFunc(replicaID) +} + +// Subscribe adds a subscription to a replica for a topic on the broker. +func (c *MessagingClient) Subscribe(replicaID, topicID uint64) error { + return c.SubscribeFunc(replicaID, topicID) +} + +// Unsubscribe removes a subscrition from a replica for a topic on the broker. +func (c *MessagingClient) Unsubscribe(replicaID, topicID uint64) error { + return c.UnsubscribeFunc(replicaID, topicID) +} + +// C returns a channel for streaming message. +func (c *MessagingClient) C() <-chan *messaging.Message { return c.c } + +// tempfile returns a temporary path. +func tempfile() string { + f, _ := ioutil.TempFile("", "influxdb-") + path := f.Name() + f.Close() + os.Remove(path) + return path +} diff --git a/httpd/response_logger.go b/httpd/response_logger.go new file mode 100644 index 0000000000..1e15854f1a --- /dev/null +++ b/httpd/response_logger.go @@ -0,0 +1,130 @@ +package httpd + +import ( + "encoding/base64" + "fmt" + "net" + "net/http" + "strconv" + "strings" + "time" +) + +type loggingResponseWriter interface { + http.ResponseWriter + Status() int + Size() int +} + +// responseLogger is wrapper of http.ResponseWriter that keeps track of its HTTP status +// code and body size +type responseLogger struct { + w http.ResponseWriter + status int + size int +} + +func (l *responseLogger) Header() http.Header { + return l.w.Header() +} + +func (l *responseLogger) Write(b []byte) (int, error) { + if l.status == 0 { + // Set status if WriteHeader has not been called + l.status = http.StatusOK + } + size, err := l.w.Write(b) + l.size += size + return size, err +} + +func (l *responseLogger) WriteHeader(s int) { + l.w.WriteHeader(s) + l.status = s +} + +func (l *responseLogger) Status() int { + return l.status +} + +func (l *responseLogger) Size() int { + return l.size +} + +// Common Log Format: http://en.wikipedia.org/wiki/Common_Log_Format + +// buildLogLine creates a common log format +// in addittion to the common fields, we also append referrer, user agent and request ID +func buildLogLine(l *responseLogger, r *http.Request, start time.Time) string { + username := parseUsername(r) + + host, _, err := net.SplitHostPort(r.RemoteAddr) + + if err != nil { + host = r.RemoteAddr + } + + uri := r.URL.RequestURI() + + referer := r.Referer() + + userAgent := r.UserAgent() + + fields := []string{ + host, + "-", + detect(username, "-"), + fmt.Sprintf("[%s]", start.Format("02/Jan/2006:15:04:05 -0700")), + r.Method, + uri, + r.Proto, + detect(strconv.Itoa(l.Status()), "-"), + strconv.Itoa(l.Size()), + detect(referer, "-"), + detect(userAgent, "-"), + r.Header.Get("Request-Id"), + } + + return strings.Join(fields, " ") +} + +// detect detects the first presense of a non blank string and returns it +func detect(values ...string) string { + for _, v := range values { + if v != "" { + return v + } + } + return "" +} + +// parses the uesrname either from the url or auth header +func parseUsername(r *http.Request) string { + var ( + username = "" + url = r.URL + ) + + // get username from the url if passed there + if url.User != nil { + if name := url.User.Username(); name != "" { + username = name + } + } + + // Try to get it from the authorization header if set there + if username == "" { + auth := r.Header.Get("Authorization") + fields := strings.Split(auth, " ") + if len(fields) == 2 { + bs, err := base64.StdEncoding.DecodeString(fields[1]) + if err == nil { + fields = strings.Split(string(bs), ":") + if len(fields) >= 1 { + username = fields[0] + } + } + } + } + return username +} diff --git a/influxql/ast.go b/influxql/ast.go index 3c588d91bc..8749fa4f70 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -233,7 +233,7 @@ func (s *CreateDatabaseStatement) String() string { // RequiredPrivilege returns the privilege required to execute a CreateDatabaseStatement. func (s *CreateDatabaseStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},} + return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}} } // DropDatabaseStatement represents a command to drop a database. @@ -252,7 +252,7 @@ func (s *DropDatabaseStatement) String() string { // RequiredPrivilege returns the privilege required to execute a DropDatabaseStatement. func (s *DropDatabaseStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},} + return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}} } // DropRetentionPolicyStatement represents a command to drop a retention policy from a database. @@ -276,7 +276,7 @@ func (s *DropRetentionPolicyStatement) String() string { // RequiredPrivilege returns the privilege required to execute a DropRetentionPolicyStatement. func (s *DropRetentionPolicyStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: s.Database, Privilege: WritePrivilege,},} + return ExecutionPrivileges{{Name: s.Database, Privilege: WritePrivilege}} } // CreateUserStatement represents a command for creating a new user. @@ -309,7 +309,7 @@ func (s *CreateUserStatement) String() string { // RequiredPrivilege returns the privilege(s) required to execute a CreateUserStatement. func (s *CreateUserStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},} + return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}} } // DropUserStatement represents a command for dropping a user. @@ -328,7 +328,7 @@ func (s *DropUserStatement) String() string { // RequiredPrivilege returns the privilege(s) required to execute a DropUserStatement. func (s *DropUserStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},} + return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}} } // Privilege is a type of action a user can be granted the right to use. @@ -387,7 +387,7 @@ func (s *GrantStatement) String() string { // RequiredPrivilege returns the privilege required to execute a GrantStatement. func (s *GrantStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},} + return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}} } // RevokeStatement represents a command to revoke a privilege from a user. @@ -418,7 +418,7 @@ func (s *RevokeStatement) String() string { // RequiredPrivilege returns the privilege required to execute a RevokeStatement. func (s *RevokeStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},} + return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}} } // CreateRetentionPolicyStatement represents a command to create a retention policy. @@ -458,7 +458,7 @@ func (s *CreateRetentionPolicyStatement) String() string { // RequiredPrivilege returns the privilege required to execute a CreateRetentionPolicyStatement. func (s *CreateRetentionPolicyStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},} + return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}} } // AlterRetentionPolicyStatement represents a command to alter an existing retention policy. @@ -506,7 +506,7 @@ func (s *AlterRetentionPolicyStatement) String() string { // RequiredPrivilege returns the privilege required to execute an AlterRetentionPolicyStatement. func (s *AlterRetentionPolicyStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},} + return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}} } // SelectStatement represents a command for extracting data from the database. @@ -566,10 +566,10 @@ func (s *SelectStatement) String() string { // RequiredPrivilege returns the privilege required to execute the SelectStatement. func (s *SelectStatement) RequiredPrivileges() ExecutionPrivileges { - ep := ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},} + ep := ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}} if s.Target != nil { - p := ExecutionPrivilege{Name: s.Target.Database, Privilege: WritePrivilege,} + p := ExecutionPrivilege{Name: s.Target.Database, Privilege: WritePrivilege} ep = append(ep, p) } return ep @@ -759,7 +759,7 @@ func (s *DeleteStatement) String() string { // RequiredPrivilege returns the privilege required to execute a DeleteStatement. func (s *DeleteStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege,},} + return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege}} } // ListSeriesStatement represents a command for listing series in the database. @@ -797,7 +797,7 @@ func (s *ListSeriesStatement) String() string { // RequiredPrivilege returns the privilege required to execute a ListSeriesStatement. func (s *ListSeriesStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},} + return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}} } // DropSeriesStatement represents a command for removing a series from the database. @@ -810,7 +810,7 @@ func (s *DropSeriesStatement) String() string { return fmt.Sprintf("DROP SERIES // RequiredPrivilege returns the privilige reqired to execute a DropSeriesStatement. func (s DropSeriesStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege,},} + return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege}} } // ListContinuousQueriesStatement represents a command for listing continuous queries. @@ -821,7 +821,7 @@ func (s *ListContinuousQueriesStatement) String() string { return "LIST CONTINUO // RequiredPrivilege returns the privilege required to execute a ListContinuousQueriesStatement. func (s *ListContinuousQueriesStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},} + return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}} } // ListDatabasesStatement represents a command for listing all databases in the cluster. @@ -832,7 +832,7 @@ func (s *ListDatabasesStatement) String() string { return "LIST DATABASES" } // RequiredPrivilege returns the privilege required to execute a ListDatabasesStatement func (s *ListDatabasesStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},} + return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}} } // CreateContinuousQueriesStatement represents a command for creating a continuous query. @@ -854,7 +854,7 @@ func (s *CreateContinuousQueryStatement) String() string { // RequiredPrivilege returns the privilege required to execute a CreateContinuousQueryStatement. func (s *CreateContinuousQueryStatement) RequiredPrivileges() ExecutionPrivileges { - ep := ExecutionPrivileges{{Name: s.Database, Privilege: ReadPrivilege,},} + ep := ExecutionPrivileges{{Name: s.Database, Privilege: ReadPrivilege}} // Selecting into a database that's different from the source? if s.Source.Target.Database != "" { @@ -863,7 +863,7 @@ func (s *CreateContinuousQueryStatement) RequiredPrivileges() ExecutionPrivilege // Add destination database privilege requirement and set it to write. p := ExecutionPrivilege{ - Name: s.Source.Target.Database, + Name: s.Source.Target.Database, Privilege: WritePrivilege, } ep = append(ep, p) @@ -884,7 +884,7 @@ func (s *DropContinuousQueryStatement) String() string { // RequiredPrivileges returns the privilege(s) required to execute a DropContinuousQueryStatement func (s *DropContinuousQueryStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege,},} + return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege}} } // ListMeasurementsStatement represents a command for listing measurements. @@ -922,7 +922,7 @@ func (s *ListMeasurementsStatement) String() string { // RequiredPrivileges returns the privilege(s) required to execute a ListMeasurementsStatement func (s *ListMeasurementsStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},} + return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}} } // ListRetentionPoliciesStatement represents a command for listing retention policies. @@ -941,7 +941,7 @@ func (s *ListRetentionPoliciesStatement) String() string { // RequiredPrivileges returns the privilege(s) required to execute a ListRetentionPoliciesStatement func (s *ListRetentionPoliciesStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},} + return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}} } // ListTagKeysStatement represents a command for listing tag keys. @@ -986,7 +986,7 @@ func (s *ListTagKeysStatement) String() string { // RequiredPrivileges returns the privilege(s) required to execute a ListTagKeysStatement func (s *ListTagKeysStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},} + return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}} } // ListTagValuesStatement represents a command for listing tag values. @@ -1031,7 +1031,7 @@ func (s *ListTagValuesStatement) String() string { // RequiredPrivileges returns the privilege(s) required to execute a ListTagValuesStatement func (s *ListTagValuesStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},} + return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}} } // ListUsersStatement represents a command for listing users. @@ -1044,7 +1044,7 @@ func (s *ListUsersStatement) String() string { // RequiredPrivileges returns the privilege(s) required to execute a ListUsersStatement func (s *ListUsersStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},} + return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}} } // ListFieldKeyStatement represents a command for listing field keys. @@ -1089,7 +1089,7 @@ func (s *ListFieldKeysStatement) String() string { // RequiredPrivileges returns the privilege(s) required to execute a ListFieldKeysStatement func (s *ListFieldKeysStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},} + return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}} } // ListFieldValuesStatement represents a command for listing field values. @@ -1134,7 +1134,7 @@ func (s *ListFieldValuesStatement) String() string { // RequiredPrivileges returns the privilege(s) required to execute a ListFieldValuesStatement func (s *ListFieldValuesStatement) RequiredPrivileges() ExecutionPrivileges { - return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},} + return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}} } // Fields represents a list of fields. diff --git a/server.go b/server.go index f49af983d7..91568680d5 100644 --- a/server.go +++ b/server.go @@ -332,6 +332,13 @@ func (s *Server) Initialize(u *url.URL) error { return nil } +// This is the same struct we use in the httpd package, but +// it seems overkill to export it and share it +type dataNodeJSON struct { + ID uint64 `json:"id"` + URL string `json:"url"` +} + // Join creates a new data node in an existing cluster, copies the metastore, // and initializes the ID. func (s *Server) Join(u *url.URL, joinURL *url.URL) error {