Add authorization unit tests & code review fixes
parent
1796998614
commit
2d34c71c0c
68
handler.go
68
handler.go
|
@ -3,6 +3,7 @@ package influxdb
|
|||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
@ -111,20 +112,20 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
func (h *Handler) makeAuthenticationHandler(fn func(http.ResponseWriter, *http.Request, *User)) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
var user *User
|
||||
if h.AuthenticationEnabled && h.server.UsersLen() > 0 {
|
||||
if h.AuthenticationEnabled && h.server.UserCount() > 0 {
|
||||
username, password, err := getUsernameAndPassword(r)
|
||||
if err != nil {
|
||||
h.error(w, err.Error(), http.StatusUnauthorized)
|
||||
httpError(w, err.Error(), http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
if username == "" {
|
||||
h.error(w, "username required", http.StatusUnauthorized)
|
||||
httpError(w, "username required", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
user, err = h.server.Authenticate(username, password)
|
||||
if err != nil {
|
||||
h.error(w, err.Error(), http.StatusUnauthorized)
|
||||
httpError(w, err.Error(), http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -141,16 +142,16 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, u *User) {
|
|||
// Parse query from query string.
|
||||
query, err := p.ParseQuery()
|
||||
if err != nil {
|
||||
h.error(w, "error parsing query: "+err.Error(), http.StatusBadRequest)
|
||||
httpError(w, "error parsing query: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// If authentication is enabled and there are no users yet, make sure
|
||||
// the first statement is creating a new cluster admin.
|
||||
if h.AuthenticationEnabled && h.server.UsersLen() == 0 {
|
||||
if h.AuthenticationEnabled && h.server.UserCount() == 0 {
|
||||
stmt, ok := query.Statements[0].(*influxql.CreateUserStatement)
|
||||
if !ok || stmt.Privilege == nil || *stmt.Privilege != influxql.AllPrivileges {
|
||||
h.error(w, "must create cluster admin", http.StatusBadRequest)
|
||||
httpError(w, "must create cluster admin", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -158,13 +159,8 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, u *User) {
|
|||
// Execute query. One result will return for each statement.
|
||||
results := h.server.ExecuteQuery(query, db, u)
|
||||
|
||||
// If any statement errored then set the response status code.
|
||||
if results.Error() != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
// Write resultset.
|
||||
_ = json.NewEncoder(w).Encode(results)
|
||||
// Send results to client.
|
||||
httpResults(w, results)
|
||||
}
|
||||
|
||||
type batchWrite struct {
|
||||
|
@ -241,7 +237,7 @@ func (h *Handler) serveMetastore(w http.ResponseWriter, r *http.Request, u *User
|
|||
w.Header().Set("Content-Disposition", `attachment; filename="meta"`)
|
||||
|
||||
if err := h.server.CopyMetastore(w); err != nil {
|
||||
h.error(w, err.Error(), http.StatusInternalServerError)
|
||||
httpError(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -268,23 +264,23 @@ func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request, _
|
|||
// Read in data node from request body.
|
||||
var n dataNodeJSON
|
||||
if err := json.NewDecoder(r.Body).Decode(&n); err != nil {
|
||||
h.error(w, err.Error(), http.StatusBadRequest)
|
||||
httpError(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Parse the URL.
|
||||
u, err := url.Parse(n.URL)
|
||||
if err != nil {
|
||||
h.error(w, "invalid data node url", http.StatusBadRequest)
|
||||
httpError(w, "invalid data node url", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Create the data node.
|
||||
if err := h.server.CreateDataNode(u); err == ErrDataNodeExists {
|
||||
h.error(w, err.Error(), http.StatusConflict)
|
||||
httpError(w, err.Error(), http.StatusConflict)
|
||||
return
|
||||
} else if err != nil {
|
||||
h.error(w, err.Error(), http.StatusInternalServerError)
|
||||
httpError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -293,7 +289,7 @@ func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request, _
|
|||
|
||||
// Create a new replica on the broker.
|
||||
if err := h.server.client.CreateReplica(node.ID); err != nil {
|
||||
h.error(w, err.Error(), http.StatusBadGateway)
|
||||
httpError(w, err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -308,16 +304,16 @@ func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request, u
|
|||
// Parse node id.
|
||||
nodeID, err := strconv.ParseUint(r.URL.Query().Get(":id"), 10, 64)
|
||||
if err != nil {
|
||||
h.error(w, "invalid node id", http.StatusBadRequest)
|
||||
httpError(w, "invalid node id", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Delete the node.
|
||||
if err := h.server.DeleteDataNode(nodeID); err == ErrDataNodeNotFound {
|
||||
h.error(w, err.Error(), http.StatusNotFound)
|
||||
httpError(w, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
} else if err != nil {
|
||||
h.error(w, err.Error(), http.StatusInternalServerError)
|
||||
httpError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -329,8 +325,26 @@ type dataNodeJSON struct {
|
|||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
// error returns an error to the client in a standard format.
|
||||
func (h *Handler) error(w http.ResponseWriter, error string, code int) {
|
||||
// TODO: Return error as JSON.
|
||||
http.Error(w, error, code)
|
||||
// httpResult writes a Results array to the client.
|
||||
func httpResults(w http.ResponseWriter, results Results) {
|
||||
if results.Error() != nil {
|
||||
if isAuthorizationError(results.Error()) {
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
w.Header().Add("content-type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(results)
|
||||
}
|
||||
|
||||
// httpError writes an error to the client in a standard format.
|
||||
func httpError(w http.ResponseWriter, error string, code int) {
|
||||
results := Results{
|
||||
&Result{Err: errors.New(error)},
|
||||
}
|
||||
|
||||
w.Header().Add("content-type", "application/json")
|
||||
w.WriteHeader(code)
|
||||
_ = json.NewEncoder(w).Encode(results)
|
||||
}
|
||||
|
|
|
@ -385,7 +385,7 @@ func TestHandler_CreateUser_BadRequest(t *testing.T) {
|
|||
status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "")
|
||||
if status != http.StatusBadRequest {
|
||||
t.Fatalf("unexpected status: %d", status)
|
||||
} else if body != "error parsing query: found 0, expected identifier at line 1, char 13" {
|
||||
} else if body != `[{"error":"error parsing query: found 0, expected identifier at line 1, char 13"}]` {
|
||||
t.Fatalf("unexpected body: %s", body)
|
||||
}
|
||||
}
|
||||
|
@ -399,7 +399,7 @@ func TestHandler_CreateUser_BadRequest_NoName(t *testing.T) {
|
|||
status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "")
|
||||
if status != http.StatusBadRequest {
|
||||
t.Fatalf("unexpected status: %d", status)
|
||||
} else if body != "error parsing query: found WITH, expected identifier at line 1, char 13" {
|
||||
} else if body != `[{"error":"error parsing query: found WITH, expected identifier at line 1, char 13"}]` {
|
||||
t.Fatalf("unexpected body: %s", body)
|
||||
}
|
||||
}
|
||||
|
@ -413,7 +413,7 @@ func TestHandler_CreateUser_BadRequest_NoPassword(t *testing.T) {
|
|||
status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "")
|
||||
if status != http.StatusBadRequest {
|
||||
t.Fatalf("unexpected status: %d", status)
|
||||
} else if body != "error parsing query: found EOF, expected WITH at line 1, char 18" {
|
||||
} else if body != `[{"error":"error parsing query: found EOF, expected WITH at line 1, char 18"}]` {
|
||||
t.Fatalf("unexpected body: %s", body)
|
||||
}
|
||||
}
|
||||
|
|
21
influxdb.go
21
influxdb.go
|
@ -105,6 +105,27 @@ var (
|
|||
ErrNotExecuted = errors.New("not executed")
|
||||
)
|
||||
|
||||
// ErrAuthorize represents an authorization error.
|
||||
type ErrAuthorize struct {
|
||||
text string
|
||||
}
|
||||
|
||||
// Error returns the text of the error.
|
||||
func (e *ErrAuthorize) Error() string {
|
||||
return e.text
|
||||
}
|
||||
|
||||
// authorize satisfies isAuthorizationError
|
||||
func (_ ErrAuthorize) authorize() {}
|
||||
|
||||
func isAuthorizationError(err error) bool {
|
||||
type authorize interface {
|
||||
authorize()
|
||||
}
|
||||
_, ok := err.(authorize)
|
||||
return ok
|
||||
}
|
||||
|
||||
// mustMarshal encodes a value to JSON.
|
||||
// This will panic if an error occurs. This should only be used internally when
|
||||
// an invalid marshal will cause corruption and a panic is appropriate.
|
||||
|
|
|
@ -566,7 +566,6 @@ func (s *SelectStatement) String() string {
|
|||
|
||||
// RequiredPrivilege returns the privilege required to execute the SelectStatement.
|
||||
func (s *SelectStatement) RequiredPrivileges() ExecutionPrivileges {
|
||||
|
||||
ep := ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege,},}
|
||||
|
||||
if s.Target != nil {
|
||||
|
@ -855,7 +854,22 @@ func (s *CreateContinuousQueryStatement) String() string {
|
|||
|
||||
// RequiredPrivilege returns the privilege required to execute a CreateContinuousQueryStatement.
|
||||
func (s *CreateContinuousQueryStatement) RequiredPrivileges() ExecutionPrivileges {
|
||||
return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege,},}
|
||||
ep := ExecutionPrivileges{{Name: s.Database, Privilege: ReadPrivilege,},}
|
||||
|
||||
// Selecting into a database that's different from the source?
|
||||
if s.Source.Target.Database != "" {
|
||||
// Change source database privilege requirement to read.
|
||||
ep[0].Privilege = ReadPrivilege
|
||||
|
||||
// Add destination database privilege requirement and set it to write.
|
||||
p := ExecutionPrivilege{
|
||||
Name: s.Source.Target.Database,
|
||||
Privilege: WritePrivilege,
|
||||
}
|
||||
ep = append(ep, p)
|
||||
}
|
||||
|
||||
return ep
|
||||
}
|
||||
|
||||
// DropContinuousQueriesStatement represents a command for removing a continuous query.
|
||||
|
|
25
server.go
25
server.go
|
@ -278,6 +278,7 @@ func (s *Server) broadcast(typ messaging.MessageType, c interface{}) (uint64, er
|
|||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Wait for the server to receive the message.
|
||||
err = s.Sync(index)
|
||||
|
||||
|
@ -796,8 +797,8 @@ func (s *Server) Users() (a []*User) {
|
|||
return a
|
||||
}
|
||||
|
||||
// UsersLen returns the number of users.
|
||||
func (s *Server) UsersLen() int {
|
||||
// UserCount returns the number of users.
|
||||
func (s *Server) UserCount() int {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return len(s.users)
|
||||
|
@ -858,9 +859,10 @@ func (s *Server) applyCreateUser(m *messaging.Message) (err error) {
|
|||
|
||||
// Create the user.
|
||||
u := &User{
|
||||
Name: c.Username,
|
||||
Hash: string(hash),
|
||||
Admin: c.Admin,
|
||||
Name: c.Username,
|
||||
Hash: string(hash),
|
||||
Privileges: make(map[string]influxql.Privilege),
|
||||
Admin: c.Admin,
|
||||
}
|
||||
|
||||
// Persist to metastore.
|
||||
|
@ -1968,6 +1970,7 @@ func (p dataNodes) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
|||
|
||||
// Authorize user u to execute query q on database.
|
||||
// database can be "" for queries that do not require a database.
|
||||
// If u is nil, this means authorization is disabled.
|
||||
func Authorize(u *User, q *influxql.Query, database string) error {
|
||||
// Cluster admins can do anything.
|
||||
if u == nil || u.Admin {
|
||||
|
@ -1991,14 +1994,16 @@ func Authorize(u *User, q *influxql.Query, database string) error {
|
|||
}
|
||||
|
||||
// Check if user has required privilege.
|
||||
if !u.Authorize(p.Privilege, database) {
|
||||
if !u.Authorize(p.Privilege, dbname) {
|
||||
var msg string
|
||||
if database != "" {
|
||||
if dbname == "" {
|
||||
msg = "requires cluster admin"
|
||||
} else {
|
||||
msg = fmt.Sprintf("requires %s privilege on %s", p.Privilege.String(), database)
|
||||
msg = fmt.Sprintf("requires %s privilege on %s", p.Privilege.String(), dbname)
|
||||
}
|
||||
return &ErrAuthorize{
|
||||
text: fmt.Sprintf("%s not authorized to execute '%s'. %s", u.Name, stmt.String(), msg),
|
||||
}
|
||||
return fmt.Errorf("%s not authorized to execute %s. %s", u.Name, stmt.String(), msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2014,7 +2019,7 @@ var BcryptCost = 10
|
|||
type User struct {
|
||||
Name string `json:"name"`
|
||||
Hash string `json:"hash"`
|
||||
Privileges map[string]influxql.Privilege // db name to privilege
|
||||
Privileges map[string]influxql.Privilege `json:"privileges"` // db name to privilege
|
||||
Admin bool `json:"admin,omitempty"`
|
||||
}
|
||||
|
||||
|
|
147
server_test.go
147
server_test.go
|
@ -95,6 +95,153 @@ func TestServer_DeleteDataNode(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Test user privilege authorization.
|
||||
func TestServer_UserPrivilegeAuthorization(t *testing.T) {
|
||||
s := OpenServer(NewMessagingClient())
|
||||
defer s.Close()
|
||||
|
||||
// Create cluster admin.
|
||||
s.CreateUser("admin", "admin", true)
|
||||
admin := s.User("admin")
|
||||
|
||||
// Create normal database user.
|
||||
s.CreateUser("user1", "user1", false)
|
||||
user1 := s.User("user1")
|
||||
user1.Privileges["foo"] = influxql.ReadPrivilege
|
||||
|
||||
s.Restart()
|
||||
|
||||
// admin user should be authorized for all privileges.
|
||||
if !admin.Authorize(influxql.AllPrivileges, "") {
|
||||
t.Fatalf("cluster admin doesn't have influxql.AllPrivileges")
|
||||
} else if !admin.Authorize(influxql.WritePrivilege, "") {
|
||||
t.Fatalf("cluster admin doesn't have influxql.WritePrivilege")
|
||||
}
|
||||
|
||||
// Normal user with only read privilege on database foo.
|
||||
if !user1.Authorize(influxql.ReadPrivilege, "foo") {
|
||||
t.Fatalf("user1 doesn't have influxql.ReadPrivilege on foo")
|
||||
} else if user1.Authorize(influxql.WritePrivilege, "foo") {
|
||||
t.Fatalf("user1 has influxql.WritePrivilege on foo")
|
||||
} else if user1.Authorize(influxql.ReadPrivilege, "bar") {
|
||||
t.Fatalf("user1 has influxql.ReadPrivilege on bar")
|
||||
} else if user1.Authorize(influxql.AllPrivileges, "") {
|
||||
t.Fatalf("user1 is cluster admin")
|
||||
}
|
||||
}
|
||||
|
||||
// Test single statement query authorization.
|
||||
func TestServer_SingleStatementQueryAuthorization(t *testing.T) {
|
||||
s := OpenServer(NewMessagingClient())
|
||||
defer s.Close()
|
||||
|
||||
// Create cluster admin.
|
||||
s.CreateUser("admin", "admin", true)
|
||||
admin := s.User("admin")
|
||||
|
||||
// Create normal database user.
|
||||
s.CreateUser("user", "user", false)
|
||||
user := s.User("user")
|
||||
user.Privileges["foo"] = influxql.ReadPrivilege
|
||||
|
||||
s.Restart()
|
||||
|
||||
// Create a query that only cluster admins can run.
|
||||
adminOnlyQuery := &influxql.Query{
|
||||
Statements: []influxql.Statement{
|
||||
&influxql.DropDatabaseStatement{Name: "foo"},
|
||||
},
|
||||
}
|
||||
|
||||
// Create a query that requires read on one db and write on another.
|
||||
readWriteQuery := &influxql.Query{
|
||||
Statements: []influxql.Statement{
|
||||
&influxql.CreateContinuousQueryStatement{
|
||||
Name: "myquery",
|
||||
Database: "foo",
|
||||
Source: &influxql.SelectStatement{
|
||||
Fields: []*influxql.Field{&influxql.Field{Expr: &influxql.Call{Name: "count"}}},
|
||||
Target: &influxql.Target{Measurement: "measure1", Database: "bar"},
|
||||
Source: &influxql.Measurement{Name: "myseries"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// admin user should be authorized to execute any query.
|
||||
if err := influxdb.Authorize(admin, adminOnlyQuery, ""); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := influxdb.Authorize(admin, readWriteQuery, "foo"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Normal user should not be authorized to execute admin only query.
|
||||
if err := influxdb.Authorize(user, adminOnlyQuery, ""); err == nil {
|
||||
t.Fatalf("normal user should not be authorized to execute cluster admin level queries")
|
||||
}
|
||||
|
||||
// Normal user should not be authorized to execute query that selects into another
|
||||
// database which (s)he doesn't have privileges on.
|
||||
if err := influxdb.Authorize(user, readWriteQuery, ""); err == nil {
|
||||
t.Fatalf("normal user should not be authorized to write to database bar")
|
||||
}
|
||||
|
||||
// Grant normal user write privileges on database "bar".
|
||||
user.Privileges["bar"] = influxql.WritePrivilege
|
||||
|
||||
//Authorization on the previous query should now succeed.
|
||||
if err := influxdb.Authorize(user, readWriteQuery, ""); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Test multiple statement query authorization.
|
||||
func TestServer_MultiStatementQueryAuthorization(t *testing.T) {
|
||||
s := OpenServer(NewMessagingClient())
|
||||
defer s.Close()
|
||||
|
||||
// Create cluster admin.
|
||||
s.CreateUser("admin", "admin", true)
|
||||
admin := s.User("admin")
|
||||
|
||||
// Create normal database user.
|
||||
s.CreateUser("user", "user", false)
|
||||
user := s.User("user")
|
||||
user.Privileges["foo"] = influxql.ReadPrivilege
|
||||
|
||||
s.Restart()
|
||||
|
||||
// Create a query that requires read for one statement and write for the second.
|
||||
readWriteQuery := &influxql.Query{
|
||||
Statements: []influxql.Statement{
|
||||
// Statement that requires read.
|
||||
&influxql.SelectStatement{
|
||||
Fields: []*influxql.Field{&influxql.Field{Expr: &influxql.Call{Name: "count"}}},
|
||||
Source: &influxql.Measurement{Name: "cpu"},
|
||||
},
|
||||
|
||||
// Statement that requires write.
|
||||
&influxql.SelectStatement{
|
||||
Fields: []*influxql.Field{&influxql.Field{Expr: &influxql.Call{Name: "count"}}},
|
||||
Source: &influxql.Measurement{Name: "cpu"},
|
||||
Target: &influxql.Target{Measurement: "tmp"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Admin should be authorized to execute both statements in the query.
|
||||
if err := influxdb.Authorize(admin, readWriteQuery, "foo"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Normal user with only read privileges should not be authorized to execute both statements.
|
||||
if err := influxdb.Authorize(user, readWriteQuery, "foo"); err == nil {
|
||||
t.Fatalf("user should not be authorized to execute both statements")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the server can create a database.
|
||||
func TestServer_CreateDatabase(t *testing.T) {
|
||||
s := OpenServer(NewMessagingClient())
|
||||
|
|
Loading…
Reference in New Issue