Merge pull request #1340 from influxdb/log-format

Move handlers and associated http functionality to httpd package
pull/1350/head
Cory LaNou 2015-01-22 14:22:01 -07:00
commit 5321a246be
6 changed files with 680 additions and 308 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/collectd"
"github.com/influxdb/influxdb/graphite"
"github.com/influxdb/influxdb/httpd"
"github.com/influxdb/influxdb/messaging"
)
@ -41,6 +42,8 @@ func execRun(args []string) {
// Print sweet InfluxDB logo and write the process id to file.
log.Print(logo)
log.SetPrefix(`[srvr] `)
log.SetFlags(log.LstdFlags)
writePIDFile(*pidPath)
// Parse the configuration and determine if a broker and/or server exist.
@ -64,8 +67,7 @@ func execRun(args []string) {
// Start the server handler. Attach to broker if listening on the same port.
if s != nil {
sh := influxdb.NewHandler(s)
sh.AuthenticationEnabled = config.Authentication.Enabled
sh := httpd.NewHandler(s, config.Authentication.Enabled, version)
if h != nil && config.BrokerAddr() == config.DataAddr() {
h.serverHandler = sh
} else {

View File

@ -1,17 +1,22 @@
package influxdb
package httpd
import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/bmizerany/pat"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
)
@ -20,10 +25,323 @@ import (
// TODO: Check HTTP response codes: 400, 401, 403, 409.
// getUsernameAndPassword returns the username and password encoded in
type route struct {
name string
method string
pattern string
handlerFunc interface{}
}
// Handler represents an HTTP handler for the InfluxDB server.
type Handler struct {
server *influxdb.Server
routes []route
mux *pat.PatternServeMux
requireAuthentication bool
}
// NewHandler returns a new instance of Handler.
func NewHandler(s *influxdb.Server, requireAuthentication bool, version string) *Handler {
h := &Handler{
server: s,
mux: pat.New(),
requireAuthentication: requireAuthentication,
}
weblog := log.New(os.Stderr, `[http] `, 0)
h.routes = append(h.routes,
route{
"query", // Query serving route.
"GET", "/query", h.serveQuery,
},
route{
"write", // Data-ingest route.
"POST", "/write", h.serveWrite,
},
route{ // List data nodes
"data_nodes_index",
"GET", "/data_nodes", h.serveDataNodes,
},
route{ // Create data node
"data_nodes_create",
"POST", "/data_nodes", h.serveCreateDataNode,
},
route{ // Delete data node
"data_nodes_delete",
"DELETE", "/data_nodes/:id", h.serveDeleteDataNode,
},
route{ // Metastore
"metastore",
"GET", "/metastore", h.serveMetastore,
},
route{ // Ping
"ping",
"GET", "/ping", h.servePing,
},
)
for _, r := range h.routes {
var handler http.Handler
// If it's a handler func that requires authorization, wrap it in authorization
if hf, ok := r.handlerFunc.(func(http.ResponseWriter, *http.Request, *influxdb.User)); ok {
handler = authenticate(hf, h, requireAuthentication)
}
// This is a normal handler signature and does not require authorization
if hf, ok := r.handlerFunc.(func(http.ResponseWriter, *http.Request)); ok {
handler = http.HandlerFunc(hf)
}
handler = versionHeader(handler, version)
handler = cors(handler)
handler = requestID(handler)
handler = logging(handler, r.name, weblog)
handler = recovery(handler, r.name, weblog) // make sure recovery is always last
h.mux.Add(r.method, r.pattern, handler)
}
return h
}
//ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.mux.ServeHTTP(w, r)
}
// serveQuery parses an incoming query and, if valid, executes the query.
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
q := r.URL.Query()
p := influxql.NewParser(strings.NewReader(q.Get("q")))
db := q.Get("db")
// Parse query from query string.
query, err := p.ParseQuery()
if err != nil {
httpError(w, "error parsing query: "+err.Error(), http.StatusBadRequest)
return
}
// If authentication is enabled and there are no users yet, make sure
// the first statement is creating a new cluster admin.
if h.requireAuthentication && h.server.UserCount() == 0 {
stmt, ok := query.Statements[0].(*influxql.CreateUserStatement)
if !ok || stmt.Privilege == nil || *stmt.Privilege != influxql.AllPrivileges {
httpError(w, "must create cluster admin", http.StatusUnauthorized)
return
}
}
// Execute query. One result will return for each statement.
results := h.server.ExecuteQuery(query, db, user)
// Send results to client.
httpResults(w, results)
}
type batchWrite struct {
Points []influxdb.Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
}
// serveWrite receives incoming series data and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
var br batchWrite
dec := json.NewDecoder(r.Body)
dec.UseNumber()
var writeError = func(result influxdb.Result, statusCode int) {
w.WriteHeader(statusCode)
w.Header().Add("content-type", "application/json")
_ = json.NewEncoder(w).Encode(&result)
return
}
for {
if err := dec.Decode(&br); err != nil {
if err.Error() == "EOF" {
w.WriteHeader(http.StatusOK)
return
}
writeError(influxdb.Result{Err: err}, http.StatusInternalServerError)
return
}
if br.Database == "" {
writeError(influxdb.Result{Err: fmt.Errorf("database is required")}, http.StatusInternalServerError)
return
}
if !h.server.DatabaseExists(br.Database) {
writeError(influxdb.Result{Err: fmt.Errorf("database not found: %q", br.Database)}, http.StatusNotFound)
return
}
if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, br.Database) {
writeError(influxdb.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name)}, http.StatusUnauthorized)
return
}
for _, p := range br.Points {
if p.Timestamp.IsZero() {
p.Timestamp = br.Timestamp
}
if len(br.Tags) > 0 {
for k, _ := range br.Tags {
if p.Tags[k] == "" {
p.Tags[k] = br.Tags[k]
}
}
}
if _, err := h.server.WriteSeries(br.Database, br.RetentionPolicy, []influxdb.Point{p}); err != nil {
writeError(influxdb.Result{Err: err}, http.StatusInternalServerError)
return
}
}
}
}
// serveMetastore returns a copy of the metastore.
func (h *Handler) serveMetastore(w http.ResponseWriter, r *http.Request) {
// Set headers.
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition", `attachment; filename="meta"`)
if err := h.server.CopyMetastore(w); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}
// servePing returns a simple response to let the client know the server is running.
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
// serveDataNodes returns a list of all data nodes in the cluster.
func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request) {
// Generate a list of objects for encoding to the API.
a := make([]*dataNodeJSON, 0)
for _, n := range h.server.DataNodes() {
a = append(a, &dataNodeJSON{
ID: n.ID,
URL: n.URL.String(),
})
}
w.Header().Add("content-type", "application/json")
_ = json.NewEncoder(w).Encode(a)
}
// serveCreateDataNode creates a new data node in the cluster.
func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request) {
// Read in data node from request body.
var n dataNodeJSON
if err := json.NewDecoder(r.Body).Decode(&n); err != nil {
httpError(w, err.Error(), http.StatusBadRequest)
return
}
// Parse the URL.
u, err := url.Parse(n.URL)
if err != nil {
httpError(w, "invalid data node url", http.StatusBadRequest)
return
}
// Create the data node.
if err := h.server.CreateDataNode(u); err == influxdb.ErrDataNodeExists {
httpError(w, err.Error(), http.StatusConflict)
return
} else if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
// Retrieve data node reference.
node := h.server.DataNodeByURL(u)
// Create a new replica on the broker.
if err := h.server.Client().CreateReplica(node.ID); err != nil {
httpError(w, err.Error(), http.StatusBadGateway)
return
}
// Write new node back to client.
w.WriteHeader(http.StatusCreated)
w.Header().Add("content-type", "application/json")
_ = json.NewEncoder(w).Encode(&dataNodeJSON{ID: node.ID, URL: node.URL.String()})
}
// serveDeleteDataNode removes an existing node.
func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request) {
// Parse node id.
nodeID, err := strconv.ParseUint(r.URL.Query().Get(":id"), 10, 64)
if err != nil {
httpError(w, "invalid node id", http.StatusBadRequest)
return
}
// Delete the node.
if err := h.server.DeleteDataNode(nodeID); err == influxdb.ErrDataNodeNotFound {
httpError(w, err.Error(), http.StatusNotFound)
return
} else if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
type dataNodeJSON struct {
ID uint64 `json:"id"`
URL string `json:"url"`
}
func isAuthorizationError(err error) bool {
type authorize interface {
authorize()
}
_, ok := err.(authorize)
return ok
}
// httpResult writes a Results array to the client.
func httpResults(w http.ResponseWriter, results influxdb.Results) {
if results.Error() != nil {
if isAuthorizationError(results.Error()) {
w.WriteHeader(http.StatusUnauthorized)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
}
w.Header().Add("content-type", "application/json")
_ = json.NewEncoder(w).Encode(results)
}
// httpError writes an error to the client in a standard format.
func httpError(w http.ResponseWriter, error string, code int) {
var results influxdb.Results
results = append(results, &influxdb.Result{Err: errors.New(error)})
w.Header().Add("content-type", "application/json")
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(results)
}
// Filters and filter helpers
// parseCredentials returns the username and password encoded in
// a request. The credentials may be present as URL query params, or as
// a Basic Authentication header.
func getUsernameAndPassword(r *http.Request) (string, string, error) {
// as params: http://127.0.0.1/query?u=username&p=password
// as basic auth: http://username:password@127.0.0.1
func parseCredentials(r *http.Request) (string, string, error) {
q := r.URL.Query()
username, password := q.Get("u"), q.Get("p")
if username != "" && password != "" {
@ -48,72 +366,23 @@ func getUsernameAndPassword(r *http.Request) (string, string, error) {
return fields[0], fields[1], nil
}
// Handler represents an HTTP handler for the InfluxDB server.
type Handler struct {
server *Server
mux *pat.PatternServeMux
// Whether endpoints require authentication.
AuthenticationEnabled bool
// The InfluxDB verion returned by the HTTP response header.
Version string
}
// NewHandler returns a new instance of Handler.
func NewHandler(s *Server) *Handler {
h := &Handler{
server: s,
mux: pat.New(),
}
// Query serving route.
h.mux.Get("/query", h.makeAuthenticationHandler(h.serveQuery))
// Data-ingest route.
h.mux.Post("/write", h.makeAuthenticationHandler(h.serveWrite))
// Data node routes.
h.mux.Get("/data_nodes", h.makeAuthenticationHandler(h.serveDataNodes))
h.mux.Post("/data_nodes", h.makeAuthenticationHandler(h.serveCreateDataNode))
h.mux.Del("/data_nodes/:id", h.makeAuthenticationHandler(h.serveDeleteDataNode))
// Utilities
h.mux.Get("/metastore", h.makeAuthenticationHandler(h.serveMetastore))
h.mux.Get("/ping", h.makeAuthenticationHandler(h.servePing))
return h
}
// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Access-Control-Allow-Origin", "*")
w.Header().Add("Access-Control-Max-Age", "2592000")
w.Header().Add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE")
w.Header().Add("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept")
w.Header().Add("X-Influxdb-Version", h.Version)
// If this is a CORS OPTIONS request then send back okie-dokie.
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
// Otherwise handle it via pat.
h.mux.ServeHTTP(w, r)
}
// makeAuthenticationHandler takes a custom handler and returns a standard handler, ensuring that
// if user credentials are passed in, an attempt is made to authenticate that user. If authentication
// fails, an error is returned to the user.
// authenticate wraps a handler and ensures that if user credentials are passed in
// an attempt is made to authenticate that user. If authentication fails, an error is returned.
//
// There is one exception: if there are no users in the system, authentication is not required. This
// is to facilitate bootstrapping of a system with authentication enabled.
func (h *Handler) makeAuthenticationHandler(fn func(http.ResponseWriter, *http.Request, *User)) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var user *User
if h.AuthenticationEnabled && h.server.UserCount() > 0 {
username, password, err := getUsernameAndPassword(r)
func authenticate(inner func(http.ResponseWriter, *http.Request, *influxdb.User), h *Handler, requireAuthentication bool) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Return early if we are not authenticating
if !requireAuthentication {
inner(w, r, nil)
return
}
var user *influxdb.User
// TODO corylanou: never allow this in the future without users
if requireAuthentication && h.server.UserCount() > 0 {
username, password, err := parseCredentials(r)
if err != nil {
httpError(w, err.Error(), http.StatusUnauthorized)
return
@ -129,222 +398,81 @@ func (h *Handler) makeAuthenticationHandler(fn func(http.ResponseWriter, *http.R
return
}
}
fn(w, r, user)
}
inner(w, r, user)
})
}
// serveQuery parses an incoming query and, if valid, executes the query.
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, u *User) {
q := r.URL.Query()
p := influxql.NewParser(strings.NewReader(q.Get("q")))
db := q.Get("db")
// versionHeader taks a HTTP handler and returns a HTTP handler
// and adds the X-INFLUXBD-VERSION header to outgoing responses.
func versionHeader(inner http.Handler, version string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("X-Influxdb-Version", version)
inner.ServeHTTP(w, r)
})
}
// Parse query from query string.
query, err := p.ParseQuery()
if err != nil {
httpError(w, "error parsing query: "+err.Error(), http.StatusBadRequest)
return
}
// cors responds to incoming requests and adds the appropriate cors headers
// TODO: corylanou: add the ability to configure this in our config
func cors(inner http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if origin := r.Header.Get("Origin"); origin != "" {
w.Header().Set(`Access-Control-Allow-Origin`, origin)
w.Header().Set(`Access-Control-Allow-Methods`, strings.Join([]string{
`DELETE`,
`GET`,
`OPTIONS`,
`POST`,
`PUT`,
}, ", "))
// If authentication is enabled and there are no users yet, make sure
// the first statement is creating a new cluster admin.
if h.AuthenticationEnabled && h.server.UserCount() == 0 {
stmt, ok := query.Statements[0].(*influxql.CreateUserStatement)
if !ok || stmt.Privilege == nil || *stmt.Privilege != influxql.AllPrivileges {
httpError(w, "must create cluster admin", http.StatusUnauthorized)
return
w.Header().Set(`Access-Control-Allow-Headers`, strings.Join([]string{
`Accept`,
`Accept-Encoding`,
`Authorization`,
`Content-Length`,
`Content-Type`,
`X-CSRF-Token`,
`X-HTTP-Method-Override`,
}, ", "))
}
}
// Execute query. One result will return for each statement.
results := h.server.ExecuteQuery(query, db, u)
// Send results to client.
httpResults(w, results)
}
type batchWrite struct {
Points []Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
}
// serveWrite receives incoming series data and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, u *User) {
var br batchWrite
dec := json.NewDecoder(r.Body)
dec.UseNumber()
var writeError = func(result Result, statusCode int) {
w.WriteHeader(statusCode)
w.Header().Add("content-type", "application/json")
_ = json.NewEncoder(w).Encode(&result)
return
}
for {
if err := dec.Decode(&br); err != nil {
if err.Error() == "EOF" {
w.WriteHeader(http.StatusOK)
return
}
writeError(Result{Err: err}, http.StatusInternalServerError)
if r.Method == "OPTIONS" {
return
}
if br.Database == "" {
writeError(Result{Err: fmt.Errorf("database is required")}, http.StatusInternalServerError)
return
inner.ServeHTTP(w, r)
})
}
func requestID(inner http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
uid := uuid.NewUUID()
r.Header.Set("Request-Id", uid.String())
w.Header().Set("Request-Id", r.Header.Get("Request-Id"))
inner.ServeHTTP(w, r)
})
}
func logging(inner http.Handler, name string, weblog *log.Logger) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
l := &responseLogger{w: w}
inner.ServeHTTP(l, r)
logLine := buildLogLine(l, r, start)
weblog.Println(logLine)
})
}
func recovery(inner http.Handler, name string, weblog *log.Logger) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
l := &responseLogger{w: w}
inner.ServeHTTP(l, r)
if err := recover(); err != nil {
logLine := buildLogLine(l, r, start)
logLine = fmt.Sprintf(`%s [err:%s]`, logLine, err)
weblog.Println(logLine)
}
if h.server.databases[br.Database] == nil {
writeError(Result{Err: fmt.Errorf("database not found: %q", br.Database)}, http.StatusNotFound)
return
}
// TODO corylanou: Check if user can write to specified database
//if !user_can_write(br.Database) {
//writeError(&Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", u.Name)}, http.StatusUnauthorized)
//return
//}
for _, p := range br.Points {
if p.Timestamp.IsZero() {
p.Timestamp = br.Timestamp
}
if len(br.Tags) > 0 {
for k, _ := range br.Tags {
if p.Tags[k] == "" {
p.Tags[k] = br.Tags[k]
}
}
}
if _, err := h.server.WriteSeries(br.Database, br.RetentionPolicy, []Point{p}); err != nil {
writeError(Result{Err: err}, http.StatusInternalServerError)
return
}
}
}
}
// serveMetastore returns a copy of the metastore.
func (h *Handler) serveMetastore(w http.ResponseWriter, r *http.Request, u *User) {
// Set headers.
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition", `attachment; filename="meta"`)
if err := h.server.CopyMetastore(w); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}
// servePing returns a simple response to let the client know the server is running.
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request, u *User) {}
// serveDataNodes returns a list of all data nodes in the cluster.
func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request, u *User) {
// Generate a list of objects for encoding to the API.
a := make([]*dataNodeJSON, 0)
for _, n := range h.server.DataNodes() {
a = append(a, &dataNodeJSON{
ID: n.ID,
URL: n.URL.String(),
})
}
w.Header().Add("content-type", "application/json")
_ = json.NewEncoder(w).Encode(a)
}
// serveCreateDataNode creates a new data node in the cluster.
func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request, _ *User) {
// Read in data node from request body.
var n dataNodeJSON
if err := json.NewDecoder(r.Body).Decode(&n); err != nil {
httpError(w, err.Error(), http.StatusBadRequest)
return
}
// Parse the URL.
u, err := url.Parse(n.URL)
if err != nil {
httpError(w, "invalid data node url", http.StatusBadRequest)
return
}
// Create the data node.
if err := h.server.CreateDataNode(u); err == ErrDataNodeExists {
httpError(w, err.Error(), http.StatusConflict)
return
} else if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
// Retrieve data node reference.
node := h.server.DataNodeByURL(u)
// Create a new replica on the broker.
if err := h.server.client.CreateReplica(node.ID); err != nil {
httpError(w, err.Error(), http.StatusBadGateway)
return
}
// Write new node back to client.
w.WriteHeader(http.StatusCreated)
w.Header().Add("content-type", "application/json")
_ = json.NewEncoder(w).Encode(&dataNodeJSON{ID: node.ID, URL: node.URL.String()})
}
// serveDeleteDataNode removes an existing node.
func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request, u *User) {
// Parse node id.
nodeID, err := strconv.ParseUint(r.URL.Query().Get(":id"), 10, 64)
if err != nil {
httpError(w, "invalid node id", http.StatusBadRequest)
return
}
// Delete the node.
if err := h.server.DeleteDataNode(nodeID); err == ErrDataNodeNotFound {
httpError(w, err.Error(), http.StatusNotFound)
return
} else if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
type dataNodeJSON struct {
ID uint64 `json:"id"`
URL string `json:"url"`
}
// httpResult writes a Results array to the client.
func httpResults(w http.ResponseWriter, results Results) {
if results.Error() != nil {
if isAuthorizationError(results.Error()) {
w.WriteHeader(http.StatusUnauthorized)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
}
w.Header().Add("content-type", "application/json")
_ = json.NewEncoder(w).Encode(results)
}
// httpError writes an error to the client in a standard format.
func httpError(w http.ResponseWriter, error string, code int) {
results := Results{
&Result{Err: errors.New(error)},
}
w.Header().Add("content-type", "application/json")
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(results)
})
}

View File

@ -1,4 +1,4 @@
package influxdb_test
package httpd_test
import (
"bytes"
@ -7,10 +7,13 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"testing"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/httpd"
"github.com/influxdb/influxdb/messaging"
)
func init() {
@ -309,7 +312,7 @@ func TestHandler_Ping(t *testing.T) {
status, _ := MustHTTP("GET", s.URL+`/ping`, nil, nil, "")
if status != http.StatusOK {
if status != http.StatusNoContent {
t.Fatalf("unexpected status: %d", status)
}
}
@ -780,20 +783,122 @@ func MustParseURL(s string) *url.URL {
// Server is a test HTTP server that wraps a handler
type HTTPServer struct {
*httptest.Server
Handler *influxdb.Handler
Handler *httpd.Handler
}
func NewHTTPServer(s *Server) *HTTPServer {
h := influxdb.NewHandler(s.Server)
h := httpd.NewHandler(s.Server, false, "X.X")
return &HTTPServer{httptest.NewServer(h), h}
}
func NewAuthenticatedHTTPServer(s *Server) *HTTPServer {
h := influxdb.NewHandler(s.Server)
h.AuthenticationEnabled = true
h := httpd.NewHandler(s.Server, true, "X.X")
return &HTTPServer{httptest.NewServer(h), h}
}
func (s *HTTPServer) Close() {
s.Server.Close()
}
// Server is a wrapping test struct for influxdb.Server.
type Server struct {
*influxdb.Server
}
// NewServer returns a new test server instance.
func NewServer() *Server {
return &Server{influxdb.NewServer()}
}
// OpenServer returns a new, open test server instance.
func OpenServer(client influxdb.MessagingClient) *Server {
s := OpenUninitializedServer(client)
if err := s.Initialize(&url.URL{Host: "127.0.0.1:8080"}); err != nil {
panic(err.Error())
}
return s
}
// OpenUninitializedServer returns a new, uninitialized, open test server instance.
func OpenUninitializedServer(client influxdb.MessagingClient) *Server {
s := NewServer()
if err := s.Open(tempfile()); err != nil {
panic(err.Error())
}
if err := s.SetClient(client); err != nil {
panic(err.Error())
}
return s
}
// TODO corylanou: evaluate how much of this should be in this package
// vs. how much should be a mocked out interface
// MessagingClient represents a test client for the messaging broker.
type MessagingClient struct {
index uint64
c chan *messaging.Message
PublishFunc func(*messaging.Message) (uint64, error)
CreateReplicaFunc func(replicaID uint64) error
DeleteReplicaFunc func(replicaID uint64) error
SubscribeFunc func(replicaID, topicID uint64) error
UnsubscribeFunc func(replicaID, topicID uint64) error
}
// NewMessagingClient returns a new instance of MessagingClient.
func NewMessagingClient() *MessagingClient {
c := &MessagingClient{c: make(chan *messaging.Message, 1)}
c.PublishFunc = c.send
c.CreateReplicaFunc = func(replicaID uint64) error { return nil }
c.DeleteReplicaFunc = func(replicaID uint64) error { return nil }
c.SubscribeFunc = func(replicaID, topicID uint64) error { return nil }
c.UnsubscribeFunc = func(replicaID, topicID uint64) error { return nil }
return c
}
// Publish attaches an autoincrementing index to the message.
// This function also execute's the client's PublishFunc mock function.
func (c *MessagingClient) Publish(m *messaging.Message) (uint64, error) {
c.index++
m.Index = c.index
return c.PublishFunc(m)
}
// send sends the message through to the channel.
// This is the default value of PublishFunc.
func (c *MessagingClient) send(m *messaging.Message) (uint64, error) {
c.c <- m
return m.Index, nil
}
// Creates a new replica with a given ID on the broker.
func (c *MessagingClient) CreateReplica(replicaID uint64) error {
return c.CreateReplicaFunc(replicaID)
}
// Deletes an existing replica with a given ID from the broker.
func (c *MessagingClient) DeleteReplica(replicaID uint64) error {
return c.DeleteReplicaFunc(replicaID)
}
// Subscribe adds a subscription to a replica for a topic on the broker.
func (c *MessagingClient) Subscribe(replicaID, topicID uint64) error {
return c.SubscribeFunc(replicaID, topicID)
}
// Unsubscribe removes a subscrition from a replica for a topic on the broker.
func (c *MessagingClient) Unsubscribe(replicaID, topicID uint64) error {
return c.UnsubscribeFunc(replicaID, topicID)
}
// C returns a channel for streaming message.
func (c *MessagingClient) C() <-chan *messaging.Message { return c.c }
// tempfile returns a temporary path.
func tempfile() string {
f, _ := ioutil.TempFile("", "influxdb-")
path := f.Name()
f.Close()
os.Remove(path)
return path
}

130
httpd/response_logger.go Normal file
View File

@ -0,0 +1,130 @@
package httpd
import (
"encoding/base64"
"fmt"
"net"
"net/http"
"strconv"
"strings"
"time"
)
type loggingResponseWriter interface {
http.ResponseWriter
Status() int
Size() int
}
// responseLogger is wrapper of http.ResponseWriter that keeps track of its HTTP status
// code and body size
type responseLogger struct {
w http.ResponseWriter
status int
size int
}
func (l *responseLogger) Header() http.Header {
return l.w.Header()
}
func (l *responseLogger) Write(b []byte) (int, error) {
if l.status == 0 {
// Set status if WriteHeader has not been called
l.status = http.StatusOK
}
size, err := l.w.Write(b)
l.size += size
return size, err
}
func (l *responseLogger) WriteHeader(s int) {
l.w.WriteHeader(s)
l.status = s
}
func (l *responseLogger) Status() int {
return l.status
}
func (l *responseLogger) Size() int {
return l.size
}
// Common Log Format: http://en.wikipedia.org/wiki/Common_Log_Format
// buildLogLine creates a common log format
// in addittion to the common fields, we also append referrer, user agent and request ID
func buildLogLine(l *responseLogger, r *http.Request, start time.Time) string {
username := parseUsername(r)
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
host = r.RemoteAddr
}
uri := r.URL.RequestURI()
referer := r.Referer()
userAgent := r.UserAgent()
fields := []string{
host,
"-",
detect(username, "-"),
fmt.Sprintf("[%s]", start.Format("02/Jan/2006:15:04:05 -0700")),
r.Method,
uri,
r.Proto,
detect(strconv.Itoa(l.Status()), "-"),
strconv.Itoa(l.Size()),
detect(referer, "-"),
detect(userAgent, "-"),
r.Header.Get("Request-Id"),
}
return strings.Join(fields, " ")
}
// detect detects the first presense of a non blank string and returns it
func detect(values ...string) string {
for _, v := range values {
if v != "" {
return v
}
}
return ""
}
// parses the uesrname either from the url or auth header
func parseUsername(r *http.Request) string {
var (
username = ""
url = r.URL
)
// get username from the url if passed there
if url.User != nil {
if name := url.User.Username(); name != "" {
username = name
}
}
// Try to get it from the authorization header if set there
if username == "" {
auth := r.Header.Get("Authorization")
fields := strings.Split(auth, " ")
if len(fields) == 2 {
bs, err := base64.StdEncoding.DecodeString(fields[1])
if err == nil {
fields = strings.Split(string(bs), ":")
if len(fields) >= 1 {
username = fields[0]
}
}
}
}
return username
}

View File

@ -233,7 +233,7 @@ func (s *CreateDatabaseStatement) String() string {
// RequiredPrivilege returns the privilege required to execute a CreateDatabaseStatement.
func (s *CreateDatabaseStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},}
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}
// DropDatabaseStatement represents a command to drop a database.
@ -252,7 +252,7 @@ func (s *DropDatabaseStatement) String() string {
// RequiredPrivilege returns the privilege required to execute a DropDatabaseStatement.
func (s *DropDatabaseStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},}
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}
// DropRetentionPolicyStatement represents a command to drop a retention policy from a database.
@ -276,7 +276,7 @@ func (s *DropRetentionPolicyStatement) String() string {
// RequiredPrivilege returns the privilege required to execute a DropRetentionPolicyStatement.
func (s *DropRetentionPolicyStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: s.Database, Privilege: WritePrivilege,},}
return ExecutionPrivileges{{Name: s.Database, Privilege: WritePrivilege}}
}
// CreateUserStatement represents a command for creating a new user.
@ -309,7 +309,7 @@ func (s *CreateUserStatement) String() string {
// RequiredPrivilege returns the privilege(s) required to execute a CreateUserStatement.
func (s *CreateUserStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},}
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}
// DropUserStatement represents a command for dropping a user.
@ -328,7 +328,7 @@ func (s *DropUserStatement) String() string {
// RequiredPrivilege returns the privilege(s) required to execute a DropUserStatement.
func (s *DropUserStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},}
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}
// Privilege is a type of action a user can be granted the right to use.
@ -387,7 +387,7 @@ func (s *GrantStatement) String() string {
// RequiredPrivilege returns the privilege required to execute a GrantStatement.
func (s *GrantStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},}
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}
// RevokeStatement represents a command to revoke a privilege from a user.
@ -418,7 +418,7 @@ func (s *RevokeStatement) String() string {
// RequiredPrivilege returns the privilege required to execute a RevokeStatement.
func (s *RevokeStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},}
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}
// CreateRetentionPolicyStatement represents a command to create a retention policy.
@ -458,7 +458,7 @@ func (s *CreateRetentionPolicyStatement) String() string {
// RequiredPrivilege returns the privilege required to execute a CreateRetentionPolicyStatement.
func (s *CreateRetentionPolicyStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},}
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}
// AlterRetentionPolicyStatement represents a command to alter an existing retention policy.
@ -506,7 +506,7 @@ func (s *AlterRetentionPolicyStatement) String() string {
// RequiredPrivilege returns the privilege required to execute an AlterRetentionPolicyStatement.
func (s *AlterRetentionPolicyStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},}
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}
// SelectStatement represents a command for extracting data from the database.
@ -566,10 +566,10 @@ func (s *SelectStatement) String() string {
// RequiredPrivilege returns the privilege required to execute the SelectStatement.
func (s *SelectStatement) RequiredPrivileges() ExecutionPrivileges {
ep := ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},}
ep := ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}}
if s.Target != nil {
p := ExecutionPrivilege{Name: s.Target.Database, Privilege: WritePrivilege,}
p := ExecutionPrivilege{Name: s.Target.Database, Privilege: WritePrivilege}
ep = append(ep, p)
}
return ep
@ -759,7 +759,7 @@ func (s *DeleteStatement) String() string {
// RequiredPrivilege returns the privilege required to execute a DeleteStatement.
func (s *DeleteStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege,},}
return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege}}
}
// ListSeriesStatement represents a command for listing series in the database.
@ -797,7 +797,7 @@ func (s *ListSeriesStatement) String() string {
// RequiredPrivilege returns the privilege required to execute a ListSeriesStatement.
func (s *ListSeriesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},}
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}}
}
// DropSeriesStatement represents a command for removing a series from the database.
@ -810,7 +810,7 @@ func (s *DropSeriesStatement) String() string { return fmt.Sprintf("DROP SERIES
// RequiredPrivilege returns the privilige reqired to execute a DropSeriesStatement.
func (s DropSeriesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege,},}
return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege}}
}
// ListContinuousQueriesStatement represents a command for listing continuous queries.
@ -821,7 +821,7 @@ func (s *ListContinuousQueriesStatement) String() string { return "LIST CONTINUO
// RequiredPrivilege returns the privilege required to execute a ListContinuousQueriesStatement.
func (s *ListContinuousQueriesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},}
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}}
}
// ListDatabasesStatement represents a command for listing all databases in the cluster.
@ -832,7 +832,7 @@ func (s *ListDatabasesStatement) String() string { return "LIST DATABASES" }
// RequiredPrivilege returns the privilege required to execute a ListDatabasesStatement
func (s *ListDatabasesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},}
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}
// CreateContinuousQueriesStatement represents a command for creating a continuous query.
@ -854,7 +854,7 @@ func (s *CreateContinuousQueryStatement) String() string {
// RequiredPrivilege returns the privilege required to execute a CreateContinuousQueryStatement.
func (s *CreateContinuousQueryStatement) RequiredPrivileges() ExecutionPrivileges {
ep := ExecutionPrivileges{{Name: s.Database, Privilege: ReadPrivilege,},}
ep := ExecutionPrivileges{{Name: s.Database, Privilege: ReadPrivilege}}
// Selecting into a database that's different from the source?
if s.Source.Target.Database != "" {
@ -863,7 +863,7 @@ func (s *CreateContinuousQueryStatement) RequiredPrivileges() ExecutionPrivilege
// Add destination database privilege requirement and set it to write.
p := ExecutionPrivilege{
Name: s.Source.Target.Database,
Name: s.Source.Target.Database,
Privilege: WritePrivilege,
}
ep = append(ep, p)
@ -884,7 +884,7 @@ func (s *DropContinuousQueryStatement) String() string {
// RequiredPrivileges returns the privilege(s) required to execute a DropContinuousQueryStatement
func (s *DropContinuousQueryStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege,},}
return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege}}
}
// ListMeasurementsStatement represents a command for listing measurements.
@ -922,7 +922,7 @@ func (s *ListMeasurementsStatement) String() string {
// RequiredPrivileges returns the privilege(s) required to execute a ListMeasurementsStatement
func (s *ListMeasurementsStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},}
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}}
}
// ListRetentionPoliciesStatement represents a command for listing retention policies.
@ -941,7 +941,7 @@ func (s *ListRetentionPoliciesStatement) String() string {
// RequiredPrivileges returns the privilege(s) required to execute a ListRetentionPoliciesStatement
func (s *ListRetentionPoliciesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},}
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}}
}
// ListTagKeysStatement represents a command for listing tag keys.
@ -986,7 +986,7 @@ func (s *ListTagKeysStatement) String() string {
// RequiredPrivileges returns the privilege(s) required to execute a ListTagKeysStatement
func (s *ListTagKeysStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},}
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}}
}
// ListTagValuesStatement represents a command for listing tag values.
@ -1031,7 +1031,7 @@ func (s *ListTagValuesStatement) String() string {
// RequiredPrivileges returns the privilege(s) required to execute a ListTagValuesStatement
func (s *ListTagValuesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},}
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}}
}
// ListUsersStatement represents a command for listing users.
@ -1044,7 +1044,7 @@ func (s *ListUsersStatement) String() string {
// RequiredPrivileges returns the privilege(s) required to execute a ListUsersStatement
func (s *ListUsersStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges,},}
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}
// ListFieldKeyStatement represents a command for listing field keys.
@ -1089,7 +1089,7 @@ func (s *ListFieldKeysStatement) String() string {
// RequiredPrivileges returns the privilege(s) required to execute a ListFieldKeysStatement
func (s *ListFieldKeysStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},}
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}}
}
// ListFieldValuesStatement represents a command for listing field values.
@ -1134,7 +1134,7 @@ func (s *ListFieldValuesStatement) String() string {
// RequiredPrivileges returns the privilege(s) required to execute a ListFieldValuesStatement
func (s *ListFieldValuesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},}
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}}
}
// Fields represents a list of fields.

View File

@ -332,6 +332,13 @@ func (s *Server) Initialize(u *url.URL) error {
return nil
}
// This is the same struct we use in the httpd package, but
// it seems overkill to export it and share it
type dataNodeJSON struct {
ID uint64 `json:"id"`
URL string `json:"url"`
}
// Join creates a new data node in an existing cluster, copies the metastore,
// and initializes the ID.
func (s *Server) Join(u *url.URL, joinURL *url.URL) error {