Merge branch 'master' into graphite_distinct_servers

Conflicts:
	cmd/influxd/config.go
pull/1294/head
Philip O'Toole 2015-01-08 12:31:21 -08:00
commit c64624abff
14 changed files with 1392 additions and 112 deletions

View File

@ -1,5 +1,47 @@
The top level name is called a measurement. These names can contain any characters. Then there are field names, field values, tag keys and tag values, which can also contain any characters. Because of this, anywhere a measurement name, field name, field value, tag name, or tag value appears should be able to get wrapped in double quotes to deal with special characters.
# Databases & retention policies
```sql
-- create a database
CREATE DATABASE <name>
-- create a retention policy
CREATE RETENTION POLICY <rp-name> ON <db-name> DURATION <duration> REPLICATION <n> [DEFAULT]
-- alter retention policy
ALTER RETENTION POLICY <rp-name> ON <db-name> (DURATION <duration> | REPLICATION <n> | DEFAULT)+
-- drop a database
DROP DATABASE <name>
```
# Users and permissions
```sql
-- create user
CREATE USER <name> WITH PASSWORD <password>
-- grant privilege on a database
GRANT <privilege> ON <db> TO <user>
-- grant cluster admin privileges
GRANT ALL [PRIVILEGES] TO <user>
-- revoke privilege
REVOKE <privilege> ON <db> FROM <user>
-- revoke all privileges for a DB
REVOKE ALL [PRIVILEGES] ON <db> FROM <user>
-- revoke all of user's privileges (all DBs and/or cluster admin)
REVOKE ALL [PRIVILEGES] FROM <user>
-- delete a user
DROP USER <name>
```
<privilege> := READ | WRITE | All [PRIVILEGES]
# Select
```sql

View File

@ -48,6 +48,10 @@ func TestParseConfig(t *testing.T) {
t.Fatalf("logging level mismatch: %v", c.Logging.Level)
}
if !c.Authentication.Enabled {
t.Fatalf("authentication enabled mismatch: %v", c.Authentication.Enabled)
}
if c.Admin.Port != 8083 {
t.Fatalf("admin port mismatch: %v", c.Admin.Port)
} else if c.Admin.Assets != "./admin" {
@ -142,6 +146,10 @@ const testFile = `
# that can be resolved here.
hostname = "myserver.com"
# Control authentication
[authentication]
enabled = true
[logging]
# logging level can be one of "debug", "info", "warn" or "error"
level = "info"

View File

@ -84,6 +84,8 @@ func execRun(args []string) {
// Start the server handler.
// If it uses the same port as the broker then simply attach it.
sh := influxdb.NewHandler(s)
sh.AuthenticationEnabled = config.Authentication.Enabled
if config.BrokerListenAddr() == config.ApiHTTPListenAddr() {
h.serverHandler = sh
} else {

View File

@ -15,6 +15,14 @@ bind-address = "0.0.0.0"
# Change this option to true to disable reporting.
reporting-disabled = false
# Control authentication
# If not set authetication is DISABLED. Be sure to explicitly set this flag to
# true if you want authentication. If authentication is enabled, and no administrative
# user exists in the system, the system will allow one administrative user to be
# created without requiring any authentication.
[authentication]
enabled = false
[logging]
# logging level can be one of "fine", "debug", "info", "warn" or "error"
level = "info"

View File

@ -1,7 +1,9 @@
package influxdb
import (
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strconv"
@ -16,11 +18,42 @@ import (
// TODO: Check HTTP response codes: 400, 401, 403, 409.
// getUsernameAndPassword returns the username and password encoded in
// a request. The credentials may be present as URL query params, or as
// a Basic Authentication header.
func getUsernameAndPassword(r *http.Request) (string, string, error) {
q := r.URL.Query()
username, password := q.Get("u"), q.Get("p")
if username != "" && password != "" {
return username, password, nil
}
auth := r.Header.Get("Authorization")
if auth == "" {
return "", "", nil
}
fields := strings.Split(auth, " ")
if len(fields) != 2 {
return "", "", fmt.Errorf("invalid Basic Authentication header")
}
bs, err := base64.StdEncoding.DecodeString(fields[1])
if err != nil {
return "", "", fmt.Errorf("invalid Base64 encoding")
}
fields = strings.Split(string(bs), ":")
if len(fields) != 2 {
return "", "", fmt.Errorf("invalid Basic Authentication value")
}
return fields[0], fields[1], nil
}
// Handler represents an HTTP handler for the InfluxDB server.
type Handler struct {
server *Server
mux *pat.PatternServeMux
// Whether endpoints require authentication.
AuthenticationEnabled bool
// The InfluxDB verion returned by the HTTP response header.
Version string
}
@ -36,37 +69,37 @@ func NewHandler(s *Server) *Handler {
h.mux.Get("/authenticate", http.HandlerFunc(h.serveAuthenticate))
// User routes.
h.mux.Get("/users", http.HandlerFunc(h.serveUsers))
h.mux.Post("/users", http.HandlerFunc(h.serveCreateUser))
h.mux.Put("/users/:user", http.HandlerFunc(h.serveUpdateUser))
h.mux.Del("/users/:user", http.HandlerFunc(h.serveDeleteUser))
h.mux.Get("/users", h.makeAuthenticationHandler(h.serveUsers))
h.mux.Post("/users", http.HandlerFunc(h.serveCreateUser)) // Non-standard authentication
h.mux.Put("/users/:user", h.makeAuthenticationHandler(h.serveUpdateUser))
h.mux.Del("/users/:user", h.makeAuthenticationHandler(h.serveDeleteUser))
// Database routes
h.mux.Get("/db", http.HandlerFunc(h.serveDatabases))
h.mux.Post("/db", http.HandlerFunc(h.serveCreateDatabase))
h.mux.Del("/db/:name", http.HandlerFunc(h.serveDeleteDatabase))
h.mux.Get("/db", h.makeAuthenticationHandler(h.serveDatabases))
h.mux.Post("/db", h.makeAuthenticationHandler(h.serveCreateDatabase))
h.mux.Del("/db/:name", h.makeAuthenticationHandler(h.serveDeleteDatabase))
// Series routes.
h.mux.Get("/db/:db/series", http.HandlerFunc(h.serveQuery))
h.mux.Post("/db/:db/series", http.HandlerFunc(h.serveWriteSeries))
h.mux.Get("/db/:db/series", h.makeAuthenticationHandler(h.serveQuery))
h.mux.Post("/db/:db/series", h.makeAuthenticationHandler(h.serveWriteSeries))
// Shard routes.
h.mux.Get("/db/:db/shards", http.HandlerFunc(h.serveShards))
h.mux.Del("/db/:db/shards/:id", http.HandlerFunc(h.serveDeleteShard))
h.mux.Get("/db/:db/shards", h.makeAuthenticationHandler(h.serveShards))
h.mux.Del("/db/:db/shards/:id", h.makeAuthenticationHandler(h.serveDeleteShard))
// Retention policy routes.
h.mux.Get("/db/:db/retention_policies", http.HandlerFunc(h.serveRetentionPolicies))
h.mux.Post("/db/:db/retention_policies", http.HandlerFunc(h.serveCreateRetentionPolicy))
h.mux.Put("/db/:db/retention_policies/:name", http.HandlerFunc(h.serveUpdateRetentionPolicy))
h.mux.Del("/db/:db/retention_policies/:name", http.HandlerFunc(h.serveDeleteRetentionPolicy))
h.mux.Get("/db/:db/retention_policies", h.makeAuthenticationHandler(h.serveRetentionPolicies))
h.mux.Post("/db/:db/retention_policies", h.makeAuthenticationHandler(h.serveCreateRetentionPolicy))
h.mux.Put("/db/:db/retention_policies/:name", h.makeAuthenticationHandler(h.serveUpdateRetentionPolicy))
h.mux.Del("/db/:db/retention_policies/:name", h.makeAuthenticationHandler(h.serveDeleteRetentionPolicy))
// Data node routes.
h.mux.Get("/data_nodes", http.HandlerFunc(h.serveDataNodes))
h.mux.Post("/data_nodes", http.HandlerFunc(h.serveCreateDataNode))
h.mux.Del("/data_nodes/:id", http.HandlerFunc(h.serveDeleteDataNode))
h.mux.Get("/data_nodes", h.makeAuthenticationHandler(h.serveDataNodes))
h.mux.Post("/data_nodes", h.makeAuthenticationHandler(h.serveCreateDataNode))
h.mux.Del("/data_nodes/:id", h.makeAuthenticationHandler(h.serveDeleteDataNode))
// Utilities
h.mux.Get("/ping", http.HandlerFunc(h.servePing))
h.mux.Get("/ping", h.makeAuthenticationHandler(h.servePing))
return h
}
@ -89,8 +122,37 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.mux.ServeHTTP(w, r)
}
// makeAuthenticationHandler takes a custom handler and returns a standard handler, ensuring that
// the system's standard authentication policies have been applied before the custom handler is called.
//
// The standard policy is if authentication is disabled, all operations are allowed and no user credentials
// are required. If authentication is enabled, valid user credentials must be supplied.
func (h *Handler) makeAuthenticationHandler(fn func(http.ResponseWriter, *http.Request, *User)) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var user *User
if h.AuthenticationEnabled {
username, password, err := getUsernameAndPassword(r)
if err != nil {
h.error(w, err.Error(), http.StatusUnauthorized)
return
}
if username == "" {
h.error(w, "username required", http.StatusUnauthorized)
return
}
user, err = h.server.Authenticate(username, password)
if err != nil {
h.error(w, err.Error(), http.StatusUnauthorized)
return
}
}
fn(w, r, user)
}
}
// serveQuery parses an incoming query and returns the results.
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, u *User) {
// TODO: Authentication.
// Parse query from query string.
@ -129,7 +191,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request) {
}
// serveWriteSeries receives incoming series data and writes it to the database.
func (h *Handler) serveWriteSeries(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveWriteSeries(w http.ResponseWriter, r *http.Request, u *User) {
// TODO: Authentication.
/* TEMPORARILY REMOVED FOR PROTOBUFS.
@ -184,7 +246,7 @@ func (h *Handler) serveWriteSeries(w http.ResponseWriter, r *http.Request) {
}
// serveDatabases returns a list of all databases on the server.
func (h *Handler) serveDatabases(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveDatabases(w http.ResponseWriter, r *http.Request, u *User) {
// Retrieve databases from the server.
databases := h.server.Databases()
@ -195,7 +257,7 @@ func (h *Handler) serveDatabases(w http.ResponseWriter, r *http.Request) {
}
// serveCreateDatabase creates a new database on the server.
func (h *Handler) serveCreateDatabase(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveCreateDatabase(w http.ResponseWriter, r *http.Request, u *User) {
var req struct {
Name string `json:"name"`
}
@ -222,7 +284,7 @@ func (h *Handler) serveCreateDatabase(w http.ResponseWriter, r *http.Request) {
}
// serveDeleteDatabase deletes an existing database on the server.
func (h *Handler) serveDeleteDatabase(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveDeleteDatabase(w http.ResponseWriter, r *http.Request, u *User) {
name := r.URL.Query().Get(":name")
if err := h.server.DeleteDatabase(name); err == ErrDatabaseNotFound {
h.error(w, err.Error(), http.StatusNotFound)
@ -238,7 +300,7 @@ func (h *Handler) serveDeleteDatabase(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveAuthenticate(w http.ResponseWriter, r *http.Request) {}
// serveUsers returns data about a single user.
func (h *Handler) serveUsers(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveUsers(w http.ResponseWriter, r *http.Request, u *User) {
// Generate a list of objects for encoding to the API.
a := make([]*userJSON, 0)
@ -262,14 +324,31 @@ type userJSON struct {
// serveCreateUser creates a new user.
func (h *Handler) serveCreateUser(w http.ResponseWriter, r *http.Request) {
// Read in user from request body.
var u userJSON
if err := json.NewDecoder(r.Body).Decode(&u); err != nil {
var newUser userJSON
if err := json.NewDecoder(r.Body).Decode(&newUser); err != nil {
h.error(w, err.Error(), http.StatusBadRequest)
return
}
// Creating a User involves a non-standard authentication policy. Iff no Admin
// already exists, and the used being created will be an admin, no authorization
// is required.
if h.AuthenticationEnabled && (h.server.AdminUserExists() || !newUser.Admin) {
username, password, err := getUsernameAndPassword(r)
if err != nil {
h.error(w, err.Error(), http.StatusUnauthorized)
return
}
_, err = h.server.Authenticate(username, password)
if err != nil {
h.error(w, err.Error(), http.StatusUnauthorized)
return
}
}
// Create the user.
if err := h.server.CreateUser(u.Name, u.Password, u.Admin); err == ErrUserExists {
if err := h.server.CreateUser(newUser.Name, newUser.Password, newUser.Admin); err == ErrUserExists {
h.error(w, err.Error(), http.StatusConflict)
return
} else if err != nil {
@ -280,16 +359,16 @@ func (h *Handler) serveCreateUser(w http.ResponseWriter, r *http.Request) {
}
// serveUpdateUser updates an existing user.
func (h *Handler) serveUpdateUser(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveUpdateUser(w http.ResponseWriter, r *http.Request, u *User) {
// Read in user from request body.
var u userJSON
if err := json.NewDecoder(r.Body).Decode(&u); err != nil {
var user userJSON
if err := json.NewDecoder(r.Body).Decode(&user); err != nil {
h.error(w, err.Error(), http.StatusBadRequest)
return
}
// Update the user.
if err := h.server.UpdateUser(r.URL.Query().Get(":user"), u.Password); err == ErrUserNotFound {
// Create the user.
if err := h.server.UpdateUser(r.URL.Query().Get(":user"), user.Password); err == ErrUserNotFound {
h.error(w, err.Error(), http.StatusNotFound)
return
} else if err != nil {
@ -300,7 +379,7 @@ func (h *Handler) serveUpdateUser(w http.ResponseWriter, r *http.Request) {
}
// serveDeleteUser removes an existing user.
func (h *Handler) serveDeleteUser(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveDeleteUser(w http.ResponseWriter, r *http.Request, u *User) {
// Delete the user.
if err := h.server.DeleteUser(r.URL.Query().Get(":user")); err == ErrUserNotFound {
h.error(w, err.Error(), http.StatusNotFound)
@ -314,10 +393,10 @@ func (h *Handler) serveDeleteUser(w http.ResponseWriter, r *http.Request) {
}
// servePing returns a simple response to let the client know the server is running.
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {}
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request, u *User) {}
// serveShards returns a list of shards.
func (h *Handler) serveShards(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveShards(w http.ResponseWriter, r *http.Request, u *User) {
q := r.URL.Query()
// Retrieves shards for the database.
@ -336,10 +415,10 @@ func (h *Handler) serveShards(w http.ResponseWriter, r *http.Request) {
}
// serveDeleteShard removes an existing shard.
func (h *Handler) serveDeleteShard(w http.ResponseWriter, r *http.Request) {}
func (h *Handler) serveDeleteShard(w http.ResponseWriter, r *http.Request, u *User) {}
// serveRetentionPolicies returns a list of retention policys.
func (h *Handler) serveRetentionPolicies(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveRetentionPolicies(w http.ResponseWriter, r *http.Request, u *User) {
// Retrieve policies by database.
policies, err := h.server.RetentionPolicies(r.URL.Query().Get(":db"))
if err == ErrDatabaseNotFound {
@ -356,7 +435,7 @@ func (h *Handler) serveRetentionPolicies(w http.ResponseWriter, r *http.Request)
}
// serveCreateRetentionPolicy creates a new retention policy.
func (h *Handler) serveCreateRetentionPolicy(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveCreateRetentionPolicy(w http.ResponseWriter, r *http.Request, u *User) {
// Decode the policy from the body.
var policy RetentionPolicy
if err := json.NewDecoder(r.Body).Decode(&policy); err != nil {
@ -379,7 +458,7 @@ func (h *Handler) serveCreateRetentionPolicy(w http.ResponseWriter, r *http.Requ
}
// serveUpdateRetentionPolicy updates an existing retention policy.
func (h *Handler) serveUpdateRetentionPolicy(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveUpdateRetentionPolicy(w http.ResponseWriter, r *http.Request, u *User) {
q := r.URL.Query()
db, name := q.Get(":db"), q.Get(":name")
@ -402,7 +481,7 @@ func (h *Handler) serveUpdateRetentionPolicy(w http.ResponseWriter, r *http.Requ
}
// serveDeleteRetentionPolicy removes an existing retention policy.
func (h *Handler) serveDeleteRetentionPolicy(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveDeleteRetentionPolicy(w http.ResponseWriter, r *http.Request, u *User) {
q := r.URL.Query()
db, name := q.Get(":db"), q.Get(":name")
@ -418,7 +497,7 @@ func (h *Handler) serveDeleteRetentionPolicy(w http.ResponseWriter, r *http.Requ
}
// serveDataNodes returns a list of all data nodes in the cluster.
func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request, u *User) {
// Generate a list of objects for encoding to the API.
a := make([]*dataNodeJSON, 0)
for _, n := range h.server.DataNodes() {
@ -433,7 +512,7 @@ func (h *Handler) serveDataNodes(w http.ResponseWriter, r *http.Request) {
}
// serveCreateDataNode creates a new data node in the cluster.
func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request, u *User) {
// Read in data node from request body.
var n dataNodeJSON
if err := json.NewDecoder(r.Body).Decode(&n); err != nil {
@ -442,14 +521,14 @@ func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request) {
}
// Parse the URL.
u, err := url.Parse(n.URL)
url, err := url.Parse(n.URL)
if err != nil {
h.error(w, "invalid data node url", http.StatusBadRequest)
return
}
// Create the data node.
if err := h.server.CreateDataNode(u); err == ErrDataNodeExists {
if err := h.server.CreateDataNode(url); err == ErrDataNodeExists {
h.error(w, err.Error(), http.StatusConflict)
return
} else if err != nil {
@ -458,14 +537,14 @@ func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request) {
}
// Write new node back to client.
node := h.server.DataNodeByURL(u)
node := h.server.DataNodeByURL(url)
w.WriteHeader(http.StatusCreated)
w.Header().Add("content-type", "application/json")
_ = json.NewEncoder(w).Encode(&dataNodeJSON{ID: node.ID, URL: node.URL.String()})
}
// serveDeleteDataNode removes an existing node.
func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request, u *User) {
// Parse node id.
nodeID, err := strconv.ParseUint(r.URL.Query().Get(":id"), 10, 64)
if err != nil {

View File

@ -2,6 +2,8 @@ package influxdb_test
import (
"bytes"
"encoding/base64"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
@ -599,12 +601,89 @@ func TestHandler_DeleteUser_DataNodeNotFound(t *testing.T) {
}
}
// Perform a subset of endpoint testing, with authentication enabled.
func TestHandler_AuthenticatedCreateAdminUser(t *testing.T) {
srvr := OpenServer(NewMessagingClient())
s := NewAuthenticatedHTTPServer(srvr)
defer s.Close()
// Attempting to create a non-admin user should fail.
status, _ := MustHTTP("POST", s.URL+`/users`, `{"name": "maeve", "password": "pass", "Admin": false}`)
if status != http.StatusUnauthorized {
t.Fatalf("unexpected status: %d", status)
}
// Creating the first admin user, without supplying authentication
// credentials should be OK.
status, _ = MustHTTP("POST", s.URL+`/users`, `{"name": "orla", "password": "pass", "Admin": true}`)
if status != http.StatusCreated {
t.Fatalf("unexpected status: %d", status)
}
// Creating a second admin user, without supplying authentication
// credentials should fail.
status, _ = MustHTTP("POST", s.URL+`/users`, `{"name": "louise", "password": "pass", "Admin": true}`)
if status != http.StatusUnauthorized {
t.Fatalf("unexpected status: %d", status)
}
}
func TestHandler_AuthenticatedDatabases_Unauthorized(t *testing.T) {
srvr := OpenServer(NewMessagingClient())
s := NewAuthenticatedHTTPServer(srvr)
defer s.Close()
status, _ := MustHTTP("GET", s.URL+`/db`, "")
if status != http.StatusUnauthorized {
t.Fatalf("unexpected status: %d", status)
}
}
func TestHandler_AuthenticatedDatabases_AuthorizedQueryParams(t *testing.T) {
srvr := OpenServer(NewMessagingClient())
srvr.CreateUser("lisa", "password", true)
s := NewAuthenticatedHTTPServer(srvr)
defer s.Close()
status, _ := MustHTTP("GET", s.URL+`/db?u=lisa&p=password`, "")
if status != http.StatusOK {
t.Fatalf("unexpected status: %d", status)
}
}
func TestHandler_AuthenticatedDatabases_AuthorizedBasicAuth(t *testing.T) {
srvr := OpenServer(NewMessagingClient())
srvr.CreateUser("lisa", "password", true)
s := NewAuthenticatedHTTPServer(srvr)
defer s.Close()
auth := make(map[string]string)
auth["Authorization"] = "Basic " + base64.StdEncoding.EncodeToString([]byte("lisa:password"))
fmt.Println(auth)
status, _ := MustHTTPWithHeaders("GET", s.URL+`/db`, auth, "")
if status != http.StatusOK {
t.Fatalf("unexpected status: %d", status)
}
}
// Utility functions for this test suite.
func MustHTTP(verb, url, body string) (int, string) {
return MustHTTPWithHeaders(verb, url, nil, body)
}
func MustHTTPWithHeaders(verb, url string, headers map[string]string, body string) (int, string) {
req, err := http.NewRequest(verb, url, bytes.NewBuffer([]byte(body)))
if err != nil {
panic(err)
}
req.Header.Set("Content-Type", "applicaton/json")
for k, v := range headers {
req.Header.Set(k, v)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
@ -637,6 +716,12 @@ func NewHTTPServer(s *Server) *HTTPServer {
return &HTTPServer{httptest.NewServer(h), h}
}
func NewAuthenticatedHTTPServer(s *Server) *HTTPServer {
h := influxdb.NewHandler(s.Server)
h.AuthenticationEnabled = true
return &HTTPServer{httptest.NewServer(h), h}
}
func (s *HTTPServer) Close() {
s.Server.Close()
}

View File

@ -55,10 +55,18 @@ func (_ *ListTagKeysStatement) node() {}
func (_ *ListTagValuesStatement) node() {}
func (_ *ListFieldKeysStatement) node() {}
func (_ *ListFieldValuesStatement) node() {}
func (_ *DropSeriesStatement) node() {}
func (_ *ListContinuousQueriesStatement) node() {}
func (_ *CreateContinuousQueryStatement) node() {}
func (_ *DropSeriesStatement) node() {}
func (_ *DropContinuousQueryStatement) node() {}
func (_ *DropDatabaseStatement) node() {}
func (_ *DropUserStatement) node() {}
func (_ *CreateContinuousQueryStatement) node() {}
func (_ *CreateDatabaseStatement) node() {}
func (_ *CreateUserStatement) node() {}
func (_ *CreateRetentionPolicyStatement) node() {}
func (_ *GrantStatement) node() {}
func (_ *RevokeStatement) node() {}
func (_ *AlterRetentionPolicyStatement) node() {}
func (_ Fields) node() {}
func (_ *Field) node() {}
@ -119,6 +127,14 @@ func (_ *ListTagKeysStatement) stmt() {}
func (_ *ListTagValuesStatement) stmt() {}
func (_ *ListFieldKeysStatement) stmt() {}
func (_ *ListFieldValuesStatement) stmt() {}
func (_ *CreateDatabaseStatement) stmt() {}
func (_ *CreateUserStatement) stmt() {}
func (_ *GrantStatement) stmt() {}
func (_ *RevokeStatement) stmt() {}
func (_ *CreateRetentionPolicyStatement) stmt() {}
func (_ *DropDatabaseStatement) stmt() {}
func (_ *DropUserStatement) stmt() {}
func (_ *AlterRetentionPolicyStatement) stmt() {}
// Expr represents an expression that can be evaluated to a value.
type Expr interface {
@ -177,6 +193,219 @@ func (a SortFields) String() string {
return strings.Join(fields, ", ")
}
// CreateDatabaseStatement represents a command for creating a new database.
type CreateDatabaseStatement struct {
// Name of the database to be created.
Name string
}
// String returns a string representation of the create database statement.
func (s *CreateDatabaseStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("CREATE DATABASE ")
_, _ = buf.WriteString(s.Name)
return buf.String()
}
// DropDatabaseStatement represents a command to drop a database.
type DropDatabaseStatement struct {
// Name of the database to be dropped.
Name string
}
// String returns a string representation of the drop database statement.
func (s *DropDatabaseStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("DROP DATABASE ")
_, _ = buf.WriteString(s.Name)
return buf.String()
}
// CreateUserStatement represents a command for creating a new user.
type CreateUserStatement struct {
// Name of the user to be created.
Name string
// User's password
Password string
}
// String returns a string representation of the create user statement.
func (s *CreateUserStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("CREATE USER ")
_, _ = buf.WriteString(s.Name)
_, _ = buf.WriteString(" WITH PASSWORD ")
_, _ = buf.WriteString(s.Password)
return buf.String()
}
// DropUserStatement represents a command for dropping a user.
type DropUserStatement struct {
// Name of the user to drop.
Name string
}
// String returns a string representation of the drop user statement.
func (s *DropUserStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("DROP USER ")
_, _ = buf.WriteString(s.Name)
return buf.String()
}
// Privilege is a type of action a user can be granted the right to use.
type Privilege int
const (
ReadPrivilege Privilege = iota
WritePrivilege
AllPrivileges
)
// String returns a string representation of a Privilege.
func (p Privilege) String() string {
switch p {
case ReadPrivilege:
return "READ"
case WritePrivilege:
return "WRITE"
case AllPrivileges:
return "ALL PRIVILEGES"
}
return ""
}
// GrantStatement represents a command for granting a privilege.
type GrantStatement struct {
// The privilege to be granted.
Privilege Privilege
// Thing to grant privilege on (e.g., a DB).
On string
// Who to grant the privilege to.
User string
}
// String returns a string representation of the grant statement.
func (s *GrantStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("GRANT ")
_, _ = buf.WriteString(s.Privilege.String())
if s.On != "" {
_, _ = buf.WriteString(" ON ")
_, _ = buf.WriteString(s.On)
}
_, _ = buf.WriteString(" TO ")
_, _ = buf.WriteString(s.User)
return buf.String()
}
// RevokeStatement represents a command to revoke a privilege from a user.
type RevokeStatement struct {
// Privilege to be revoked.
Privilege Privilege
// Thing to revoke privilege to (e.g., a DB)
On string
// Who to revoke privilege from.
User string
}
// String returns a string representation of the revoke statement.
func (s *RevokeStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("REVOKE ")
_, _ = buf.WriteString(s.Privilege.String())
if s.On != "" {
_, _ = buf.WriteString(" ON ")
_, _ = buf.WriteString(s.On)
}
_, _ = buf.WriteString(" FROM ")
_, _ = buf.WriteString(s.User)
return buf.String()
}
// CreateRetentionPolicyStatement represents a command to create a retention policy.
type CreateRetentionPolicyStatement struct {
// Name of policy to create.
Name string
// Name of database this policy belongs to.
DB string
// Duration data written to this policy will be retained.
Duration time.Duration
// Replication factor for data written to this policy.
Replication int
// Should this policy be set as default for the database?
Default bool
}
// String returns a string representation of the create retention policy.
func (s *CreateRetentionPolicyStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("CREATE RETENTION POLICY ")
_, _ = buf.WriteString(s.Name)
_, _ = buf.WriteString(" ON ")
_, _ = buf.WriteString(s.DB)
_, _ = buf.WriteString(" DURATION ")
_, _ = buf.WriteString(FormatDuration(s.Duration))
_, _ = buf.WriteString(" REPLICATION ")
_, _ = buf.WriteString(strconv.Itoa(s.Replication))
if s.Default {
_, _ = buf.WriteString(" DEFAULT")
}
return buf.String()
}
// AlterRetentionPolicyStatement represents a command to alter an existing retention policy.
type AlterRetentionPolicyStatement struct {
// Name of policy to alter.
Name string
// Name of the database this policy belongs to.
DB string
// Duration data written to this policy will be retained.
Duration *time.Duration
// Replication factor for data written to this policy.
Replication *int
// Should this policy be set as defalut for the database?
Default bool
}
// String returns a string representation of the alter retention policy statement.
func (s *AlterRetentionPolicyStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("ALTER RETENTION POLICY ")
_, _ = buf.WriteString(s.Name)
_, _ = buf.WriteString(" ON ")
_, _ = buf.WriteString(s.DB)
if s.Duration != nil {
_, _ = buf.WriteString(" DURATION ")
_, _ = buf.WriteString(FormatDuration(*s.Duration))
}
if s.Replication != nil {
_, _ = buf.WriteString(" REPLICATION ")
_, _ = buf.WriteString(strconv.Itoa(*s.Replication))
}
if s.Default {
_, _ = buf.WriteString(" DEFAULT")
}
return buf.String()
}
// SelectStatement represents a command for extracting data from the database.
type SelectStatement struct {
// Expressions returned from the selection.

View File

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"math"
"regexp"
"strconv"
"strings"
@ -68,50 +69,377 @@ func (p *Parser) ParseStatement() (Statement, error) {
case DELETE:
return p.parseDeleteStatement()
case LIST:
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == SERIES {
return p.parseListSeriesStatement()
} else if tok == CONTINUOUS {
return p.parseListContinuousQueriesStatement()
} else if tok == MEASUREMENTS {
return p.parseListMeasurementsStatement()
} else if tok == TAG {
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == KEYS {
return p.parseListTagKeysStatement()
} else if tok == VALUES {
return p.parseListTagValuesStatement()
} else {
return nil, newParseError(tokstr(tok, lit), []string{"KEYS", "VALUES"}, pos)
}
} else if tok == FIELD {
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == KEYS {
return p.parseListFieldKeysStatement()
} else if tok == VALUES {
return p.parseListFieldValuesStatement()
} else {
return nil, newParseError(tokstr(tok, lit), []string{"KEYS", "VALUES"}, pos)
}
} else {
return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS"}, pos)
}
return p.parseListStatement()
case CREATE:
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == CONTINUOUS {
return p.parseCreateContinuousQueryStatement()
} else {
return nil, newParseError(tokstr(tok, lit), []string{"CONTINUOUS"}, pos)
}
return p.parseCreateStatement()
case DROP:
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == SERIES {
return p.parseDropSeriesStatement()
} else if tok == CONTINUOUS {
return p.parseDropContinuousQueryStatement()
} else {
return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS"}, pos)
}
return p.parseDropStatement()
case GRANT:
return p.parseGrantStatement()
case REVOKE:
return p.parseRevokeStatement()
case ALTER:
return p.parseAlterStatement()
default:
return nil, newParseError(tokstr(tok, lit), []string{"SELECT"}, pos)
}
}
// parseListStatement parses a string and returns a list statement.
// This function assumes the LIST token has already been consumed.
func (p *Parser) parseListStatement() (Statement, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == SERIES {
return p.parseListSeriesStatement()
} else if tok == CONTINUOUS {
return p.parseListContinuousQueriesStatement()
} else if tok == MEASUREMENTS {
return p.parseListMeasurementsStatement()
} else if tok == TAG {
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == KEYS {
return p.parseListTagKeysStatement()
} else if tok == VALUES {
return p.parseListTagValuesStatement()
} else {
return nil, newParseError(tokstr(tok, lit), []string{"KEYS", "VALUES"}, pos)
}
} else if tok == FIELD {
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == KEYS {
return p.parseListFieldKeysStatement()
} else if tok == VALUES {
return p.parseListFieldValuesStatement()
} else {
return nil, newParseError(tokstr(tok, lit), []string{"KEYS", "VALUES"}, pos)
}
}
return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS", "MEASUREMENTS", "TAG", "FIELD"}, pos)
}
// parseCreateStatement parses a string and returns a create statement.
// This function assumes the CREATE token has already been consumned.
func (p *Parser) parseCreateStatement() (Statement, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == CONTINUOUS {
return p.parseCreateContinuousQueryStatement()
} else if tok == DATABASE {
return p.parseCreateDatabaseStatement()
} else if tok == USER {
return p.parseCreateUserStatement()
} else if tok == RETENTION {
tok, pos, lit = p.scanIgnoreWhitespace()
if tok != POLICY {
return nil, newParseError(tokstr(tok, lit), []string{"POLICY"}, pos)
}
return p.parseCreateRetentionPolicyStatement()
}
return nil, newParseError(tokstr(tok, lit), []string{"CONTINUOUS", "DATABASE", "USER", "RETENTION"}, pos)
}
// parseDropStatement parses a string and returns a drop statement.
// This function assumes the DROP token has already been consumed.
func (p *Parser) parseDropStatement() (Statement, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == SERIES {
return p.parseDropSeriesStatement()
} else if tok == CONTINUOUS {
return p.parseDropContinuousQueryStatement()
} else if tok == DATABASE {
return p.parseDropDatabaseStatement()
} else if tok == USER {
return p.parseDropUserStatement()
}
return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS"}, pos)
}
// parseAlterStatement parses a string and returns an alter statement.
// This function assumes the ALTER token has already been consumed.
func (p *Parser) parseAlterStatement() (Statement, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == RETENTION {
if tok, pos, lit = p.scanIgnoreWhitespace(); tok != POLICY {
return nil, newParseError(tokstr(tok, lit), []string{"POLICY"}, pos)
}
return p.parseAlterRetentionPolicyStatement()
}
return nil, newParseError(tokstr(tok, lit), []string{"RETENTION"}, pos)
}
// parseCreateRetentionPolicyStatement parses a string and returns a create retention policy statement.
// This function assumes the CREATE RETENTION POLICY tokens have already been consumed.
func (p *Parser) parseCreateRetentionPolicyStatement() (*CreateRetentionPolicyStatement, error) {
stmt := &CreateRetentionPolicyStatement{}
// Parse the retention policy name.
ident, err := p.parseIdentifier()
if err != nil {
return nil, err
}
stmt.Name = ident
// Consume the required ON token.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != ON {
return nil, newParseError(tokstr(tok, lit), []string{"ON"}, pos)
}
// Parse the database name.
ident, err = p.parseIdentifier()
if err != nil {
return nil, err
}
stmt.DB = ident
// Parse required DURATION token.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != DURATION {
return nil, newParseError(tokstr(tok, lit), []string{"DURATION"}, pos)
}
// Parse duration value
d, err := p.parseDuration()
if err != nil {
return nil, err
}
stmt.Duration = d
// Parse required REPLICATION token.
if tok, pos, lit = p.scanIgnoreWhitespace(); tok != REPLICATION {
return nil, newParseError(tokstr(tok, lit), []string{"REPLICATION"}, pos)
}
// Parse replication value.
n, err := p.parseInt(1, math.MaxInt32)
if err != nil {
return nil, err
}
stmt.Replication = n
// Parse optional DEFAULT token.
if tok, pos, lit = p.scanIgnoreWhitespace(); tok == DEFAULT {
stmt.Default = true
} else {
p.unscan()
}
return stmt, nil
}
// parseAlterRetentionPolicyStatement parses a string and returns an alter retention policy statement.
// This function assumes the ALTER RETENTION POLICY tokens have already been consumned.
func (p *Parser) parseAlterRetentionPolicyStatement() (*AlterRetentionPolicyStatement, error) {
stmt := &AlterRetentionPolicyStatement{}
// Parse the retention policy name.
ident, err := p.parseIdentifier()
if err != nil {
return nil, err
}
stmt.Name = ident
// Consume the required ON token.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != ON {
return nil, newParseError(tokstr(tok, lit), []string{"ON"}, pos)
}
// Parse the database name.
ident, err = p.parseIdentifier()
if err != nil {
return nil, err
}
stmt.DB = ident
// Loop through option tokens (DURATION, RETENTION, DEFAULT, etc.).
maxNumOptions := 3
Loop:
for i := 0; i < maxNumOptions; i++ {
tok, pos, lit := p.scanIgnoreWhitespace()
switch tok {
case DURATION:
d, err := p.parseDuration()
if err != nil {
return nil, err
}
stmt.Duration = &d
case REPLICATION:
n, err := p.parseInt(1, math.MaxInt32)
if err != nil {
return nil, err
}
stmt.Replication = &n
case DEFAULT:
stmt.Default = true
default:
if i < 1 {
return nil, newParseError(tokstr(tok, lit), []string{"DURATION", "RETENTION", "DEFAULT"}, pos)
}
p.unscan()
break Loop
}
}
return stmt, nil
}
// parseInt parses a string and returns an integer literal.
func (p *Parser) parseInt(min, max int) (int, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != NUMBER {
return 0, newParseError(tokstr(tok, lit), []string{"number"}, pos)
}
// Return an error if the number has a fractional part.
if strings.Contains(lit, ".") {
return 0, &ParseError{Message: "number must be an integer", Pos: pos}
}
// Convert string to int.
n, err := strconv.Atoi(lit)
if err != nil {
return 0, &ParseError{Message: err.Error(), Pos: pos}
} else if min > n || n > max {
return 0, &ParseError{
Message: fmt.Sprintf("invalid value %d: must be %d <= n <= %d", n, min, max),
Pos: pos,
}
}
return n, nil
}
// parseDuration parses a string and returns a duration literal.
// This function assumes the DURATION token has already been consumed.
func (p *Parser) parseDuration() (time.Duration, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != DURATION_VAL {
return 0, newParseError(tokstr(tok, lit), []string{"duration"}, pos)
}
d, err := ParseDuration(lit)
if err != nil {
return 0, &ParseError{Message: err.Error(), Pos: pos}
}
return d, nil
}
// parserIdentifier parses a string and returns an identifier.
func (p *Parser) parseIdentifier() (string, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return "", newParseError(tokstr(tok, lit), []string{"identifier"}, pos)
}
return lit, nil
}
// parseRevokeStatement parses a string and returns a revoke statement.
// This function assumes the REVOKE token has already been consumend.
func (p *Parser) parseRevokeStatement() (*RevokeStatement, error) {
stmt := &RevokeStatement{}
// Parse the privilege to be granted.
priv, err := p.parsePrivilege()
if err != nil {
return nil, err
}
stmt.Privilege = priv
// Parse ON clause.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == ON {
// Parse the name of the thing we're granting a privilege to use.
tok, pos, lit = p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
}
stmt.On = lit
tok, pos, lit = p.scanIgnoreWhitespace()
} else if priv != AllPrivileges {
// ALL PRIVILEGES is the only privilege allowed cluster-wide.
// No ON clause means query is requesting cluster-wide.
return nil, newParseError(tokstr(tok, lit), []string{"ON"}, pos)
}
// Check for required FROM token.
if tok != FROM {
return nil, newParseError(tokstr(tok, lit), []string{"FROM"}, pos)
}
// Parse the name of the user we're granting the privilege to.
tok, pos, lit = p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
}
stmt.User = lit
return stmt, nil
}
// parseGrantStatement parses a string and returns a grant statement.
// This function assumes the GRANT token has already been consumed.
func (p *Parser) parseGrantStatement() (*GrantStatement, error) {
stmt := &GrantStatement{}
// Parse the privilege to be granted.
priv, err := p.parsePrivilege()
if err != nil {
return nil, err
}
stmt.Privilege = priv
// Parse ON clause.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == ON {
// Parse the name of the thing we're granting a privilege to use.
tok, pos, lit = p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
}
stmt.On = lit
tok, pos, lit = p.scanIgnoreWhitespace()
} else if priv != AllPrivileges {
// ALL PRIVILEGES is the only privilege allowed cluster-wide.
// No ON clause means query is requesting cluster-wide.
return nil, newParseError(tokstr(tok, lit), []string{"ON"}, pos)
}
// Check for required TO token.
if tok != TO {
return nil, newParseError(tokstr(tok, lit), []string{"TO"}, pos)
}
// Parse the name of the user we're granting the privilege to.
tok, pos, lit = p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
}
stmt.User = lit
return stmt, nil
}
// parsePrivilege parses a string and returns a Privilege
func (p *Parser) parsePrivilege() (Privilege, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
switch tok {
case READ:
return ReadPrivilege, nil
case WRITE:
return WritePrivilege, nil
case ALL:
// Consume optional PRIVILEGES token
tok, pos, lit = p.scanIgnoreWhitespace()
if tok != PRIVILEGES {
p.unscan()
}
return AllPrivileges, nil
}
return 0, newParseError(tokstr(tok, lit), []string{"READ", "WRITE", "ALL [PRIVILEGES]"}, pos)
}
// parseSelectStatement parses a select string and returns a Statement AST object.
// This function assumes the SELECT token has already been consumed.
func (p *Parser) parseSelectStatement() (*SelectStatement, error) {
@ -479,6 +807,110 @@ func (p *Parser) parseCreateContinuousQueryStatement() (*CreateContinuousQuerySt
return stmt, nil
}
// parseCreateDatabaseStatement parses a string and returns a CreateDatabaseStatement.
// This function assumes the "CREATE DATABASE" tokens have already been consumed.
func (p *Parser) parseCreateDatabaseStatement() (*CreateDatabaseStatement, error) {
stmt := &CreateDatabaseStatement{}
// Parse the name of the database to be created.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier"}, pos)
}
stmt.Name = lit
return stmt, nil
}
// parseDropDatabaseStatement parses a string and returns a DropDatabaseStatement.
// This function assumes the DROP DATABASE tokens have already been consumed.
func (p *Parser) parseDropDatabaseStatement() (*DropDatabaseStatement, error) {
stmt := &DropDatabaseStatement{}
// Parse the name of the database to be dropped.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier"}, pos)
}
stmt.Name = lit
return stmt, nil
}
// parseCreateUserStatement parses a string and returns a CreateUserStatement.
// This function assumes the "CREATE USER" tokens have already been consumed.
func (p *Parser) parseCreateUserStatement() (*CreateUserStatement, error) {
stmt := &CreateUserStatement{}
// Parse name of the user to be created.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
}
stmt.Name = lit
// Consume "WITH PASSWORD" tokens
if err := p.parseTokens([]Token{WITH, PASSWORD}); err != nil {
return nil, err
}
// Parse new user's password
tok, pos, lit = p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
}
stmt.Password = lit
return stmt, nil
}
// parseDropUserStatement parses a string and returns a DropUserStatement.
// This function assumes the DROP USER tokens have already been consumed.
func (p *Parser) parseDropUserStatement() (*DropUserStatement, error) {
stmt := &DropUserStatement{}
// Parse the name of the user to be dropped.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier"}, pos)
}
stmt.Name = lit
return stmt, nil
}
// parseRetentionPolicy parses a string and returns a retention policy name.
// This function assumes the "WITH" token has already been consumed.
func (p *Parser) parseRetentionPolicy() (name string, dfault bool, err error) {
// Check for optional DEFAULT token.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == DEFAULT {
dfault = true
tok, pos, lit = p.scanIgnoreWhitespace()
}
// Check for required RETENTION token.
if tok != RETENTION {
err = newParseError(tokstr(tok, lit), []string{"RETENTION"}, pos)
return
}
// Check of required POLICY token.
if tok, pos, lit = p.scanIgnoreWhitespace(); tok != POLICY {
err = newParseError(tokstr(tok, lit), []string{"POLICY"}, pos)
return
}
// Parse retention policy name.
tok, pos, name = p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
err = newParseError(tokstr(tok, name), []string{"identifier"}, pos)
return
}
return
}
// parseDropContinuousQueriesStatement parses a string and returns a DropContinuousQueryStatement.
// This function assumes the "DROP CONTINUOUS" tokens have already been consumed.
func (p *Parser) parseDropContinuousQueryStatement() (*DropContinuousQueryStatement, error) {
@ -879,7 +1311,7 @@ func (p *Parser) parseUnaryExpr() (Expr, error) {
return &NumberLiteral{Val: v}, nil
case TRUE, FALSE:
return &BooleanLiteral{Val: (tok == TRUE)}, nil
case DURATION:
case DURATION_VAL:
v, _ := ParseDuration(lit)
return &DurationLiteral{Val: v}, nil
default:
@ -1023,6 +1455,16 @@ func FormatDuration(d time.Duration) string {
}
}
// parseTokens consumes an expected sequence of tokens.
func (p *Parser) parseTokens(toks []Token) error {
for _, expected := range toks {
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != expected {
return newParseError(tokstr(tok, lit), []string{tokens[expected]}, pos)
}
}
return nil
}
// Quote returns a quoted string.
func Quote(s string) string {
return `"` + strings.NewReplacer("\n", `\n`, `\`, `\\`, `"`, `\"`).Replace(s) + `"`

View File

@ -290,12 +290,192 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// CREATE DATABASE statement
{
s: `CREATE DATABASE testdb`,
stmt: &influxql.CreateDatabaseStatement{
Name: "testdb",
},
},
// CREATE USER statement
{
s: `CREATE USER testuser WITH PASSWORD pwd1337`,
stmt: &influxql.CreateUserStatement{
Name: "testuser",
Password: "pwd1337",
},
},
// DROP CONTINUOUS QUERY statement
{
s: `DROP CONTINUOUS QUERY myquery`,
stmt: &influxql.DropContinuousQueryStatement{Name: "myquery"},
},
// DROP DATABASE statement
{
s: `DROP DATABASE testdb`,
stmt: &influxql.DropDatabaseStatement{Name: "testdb"},
},
// DROP USER statement
{
s: `DROP USER jdoe`,
stmt: &influxql.DropUserStatement{Name: "jdoe"},
},
// GRANT READ
{
s: `GRANT READ ON testdb TO jdoe`,
stmt: &influxql.GrantStatement{
Privilege: influxql.ReadPrivilege,
On: "testdb",
User: "jdoe",
},
},
// GRANT WRITE
{
s: `GRANT WRITE ON testdb TO jdoe`,
stmt: &influxql.GrantStatement{
Privilege: influxql.WritePrivilege,
On: "testdb",
User: "jdoe",
},
},
// GRANT ALL
{
s: `GRANT ALL ON testdb TO jdoe`,
stmt: &influxql.GrantStatement{
Privilege: influxql.AllPrivileges,
On: "testdb",
User: "jdoe",
},
},
// GRANT ALL PRIVILEGES
{
s: `GRANT ALL PRIVILEGES ON testdb TO jdoe`,
stmt: &influxql.GrantStatement{
Privilege: influxql.AllPrivileges,
On: "testdb",
User: "jdoe",
},
},
// GRANT cluster admin
{
s: `GRANT ALL PRIVILEGES TO jdoe`,
stmt: &influxql.GrantStatement{
Privilege: influxql.AllPrivileges,
User: "jdoe",
},
},
// REVOKE READ
{
s: `REVOKE READ on testdb FROM jdoe`,
stmt: &influxql.RevokeStatement{
Privilege: influxql.ReadPrivilege,
On: "testdb",
User: "jdoe",
},
},
// REVOKE WRITE
{
s: `REVOKE WRITE ON testdb FROM jdoe`,
stmt: &influxql.RevokeStatement{
Privilege: influxql.WritePrivilege,
On: "testdb",
User: "jdoe",
},
},
// REVOKE ALL
{
s: `REVOKE ALL ON testdb FROM jdoe`,
stmt: &influxql.RevokeStatement{
Privilege: influxql.AllPrivileges,
On: "testdb",
User: "jdoe",
},
},
// REVOKE ALL PRIVILEGES
{
s: `REVOKE ALL PRIVILEGES ON testdb FROM jdoe`,
stmt: &influxql.RevokeStatement{
Privilege: influxql.AllPrivileges,
On: "testdb",
User: "jdoe",
},
},
// REVOKE cluster admin
{
s: `REVOKE ALL FROM jdoe`,
stmt: &influxql.RevokeStatement{
Privilege: influxql.AllPrivileges,
User: "jdoe",
},
},
// CREATE RETENTION POLICY
{
s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 2`,
stmt: &influxql.CreateRetentionPolicyStatement{
Name: "policy1",
DB: "testdb",
Duration: time.Hour,
Replication: 2,
},
},
// CREATE RETENTION POLICY ... DEFAULT
{
s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 2m REPLICATION 4 DEFAULT`,
stmt: &influxql.CreateRetentionPolicyStatement{
Name: "policy1",
DB: "testdb",
Duration: 2 * time.Minute,
Replication: 4,
Default: true,
},
},
// ALTER RETENTION POLICY
{
s: `ALTER RETENTION POLICY policy1 ON testdb DURATION 1m REPLICATION 4 DEFAULT`,
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", time.Minute, 4, true),
},
// ALTER RETENTION POLICY with options in reverse order
{
s: `ALTER RETENTION POLICY policy1 ON testdb DEFAULT REPLICATION 4 DURATION 1m`,
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", time.Minute, 4, true),
},
// ALTER RETENTION POLICY without optional DURATION
{
s: `ALTER RETENTION POLICY policy1 ON testdb DEFAULT REPLICATION 4`,
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, 4, true),
},
// ALTER RETENTION POLICY without optional REPLICATION
{
s: `ALTER RETENTION POLICY policy1 ON testdb DEFAULT`,
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, -1, true),
},
// ALTER RETENTION POLICY without optional DEFAULT
{
s: `ALTER RETENTION POLICY policy1 ON testdb REPLICATION 4`,
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, 4, false),
},
// Errors
{s: ``, err: `found EOF, expected SELECT at line 1, char 1`},
{s: `SELECT`, err: `found EOF, expected identifier, string, number, bool at line 1, char 8`},
@ -319,10 +499,44 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `DELETE FROM myseries WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},
{s: `DROP SERIES`, err: `found EOF, expected identifier, string at line 1, char 13`},
{s: `LIST CONTINUOUS`, err: `found EOF, expected QUERIES at line 1, char 17`},
{s: `LIST FOO`, err: `found FOO, expected SERIES, CONTINUOUS at line 1, char 6`},
{s: `LIST FOO`, err: `found FOO, expected SERIES, CONTINUOUS, MEASUREMENTS, TAG, FIELD at line 1, char 6`},
{s: `DROP CONTINUOUS`, err: `found EOF, expected QUERY at line 1, char 17`},
{s: `DROP CONTINUOUS QUERY`, err: `found EOF, expected identifier, string at line 1, char 23`},
{s: `DROP FOO`, err: `found FOO, expected SERIES, CONTINUOUS at line 1, char 6`},
{s: `DROP DATABASE`, err: `found EOF, expected identifier at line 1, char 15`},
{s: `DROP USER`, err: `found EOF, expected identifier at line 1, char 11`},
{s: `CREATE USER testuser`, err: `found EOF, expected WITH at line 1, char 22`},
{s: `GRANT`, err: `found EOF, expected READ, WRITE, ALL [PRIVILEGES] at line 1, char 7`},
{s: `GRANT BOGUS`, err: `found BOGUS, expected READ, WRITE, ALL [PRIVILEGES] at line 1, char 7`},
{s: `GRANT READ`, err: `found EOF, expected ON at line 1, char 12`},
{s: `GRANT READ TO jdoe`, err: `found TO, expected ON at line 1, char 12`},
{s: `GRANT READ ON`, err: `found EOF, expected identifier, string at line 1, char 15`},
{s: `GRANT READ ON testdb`, err: `found EOF, expected TO at line 1, char 22`},
{s: `GRANT READ ON testdb TO`, err: `found EOF, expected identifier, string at line 1, char 25`}, {s: `GRANT`, err: `found EOF, expected READ, WRITE, ALL [PRIVILEGES] at line 1, char 7`},
{s: `REVOKE BOGUS`, err: `found BOGUS, expected READ, WRITE, ALL [PRIVILEGES] at line 1, char 8`},
{s: `REVOKE READ`, err: `found EOF, expected ON at line 1, char 13`},
{s: `REVOKE READ TO jdoe`, err: `found TO, expected ON at line 1, char 13`},
{s: `REVOKE READ ON`, err: `found EOF, expected identifier, string at line 1, char 16`},
{s: `REVOKE READ ON testdb`, err: `found EOF, expected FROM at line 1, char 23`},
{s: `REVOKE READ ON testdb FROM`, err: `found EOF, expected identifier, string at line 1, char 28`},
{s: `CREATE RETENTION`, err: `found EOF, expected POLICY at line 1, char 18`},
{s: `CREATE RETENTION POLICY`, err: `found EOF, expected identifier at line 1, char 25`},
{s: `CREATE RETENTION POLICY policy1`, err: `found EOF, expected ON at line 1, char 33`},
{s: `CREATE RETENTION POLICY policy1 ON`, err: `found EOF, expected identifier at line 1, char 36`},
{s: `CREATE RETENTION POLICY policy1 ON testdb`, err: `found EOF, expected DURATION at line 1, char 43`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION`, err: `found EOF, expected duration at line 1, char 52`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION bad`, err: `found bad, expected duration at line 1, char 52`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h`, err: `found EOF, expected REPLICATION at line 1, char 54`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION`, err: `found EOF, expected number at line 1, char 67`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 3.14`, err: `number must be an integer at line 1, char 67`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 0`, err: `invalid value 0: must be 1 <= n <= 2147483647 at line 1, char 67`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION bad`, err: `found bad, expected number at line 1, char 67`},
{s: `ALTER`, err: `found EOF, expected RETENTION at line 1, char 7`},
{s: `ALTER RETENTION`, err: `found EOF, expected POLICY at line 1, char 17`},
{s: `ALTER RETENTION POLICY`, err: `found EOF, expected identifier at line 1, char 24`},
{s: `ALTER RETENTION POLICY policy1`, err: `found EOF, expected ON at line 1, char 32`},
{s: `ALTER RETENTION POLICY policy1 ON`, err: `found EOF, expected identifier at line 1, char 35`},
{s: `ALTER RETENTION POLICY policy1 ON testdb`, err: `found EOF, expected DURATION, RETENTION, DEFAULT at line 1, char 42`},
}
for i, tt := range tests {
@ -615,3 +829,22 @@ func errstring(err error) string {
}
return ""
}
// newAlterRetentionPolicyStatement creates an initialized AlterRetentionPolicyStatement.
func newAlterRetentionPolicyStatement(name string, DB string, d time.Duration, replication int, dfault bool) *influxql.AlterRetentionPolicyStatement {
stmt := &influxql.AlterRetentionPolicyStatement{
Name: name,
DB: DB,
Default: dfault,
}
if d > -1 {
stmt.Duration = &d
}
if replication > -1 {
stmt.Replication = &replication
}
return stmt
}

View File

@ -221,7 +221,7 @@ func (s *Scanner) scanNumber() (tok Token, pos Pos, lit string) {
// If the next rune is a duration unit (u,µ,ms,s) then return a duration token
if ch0, _ := s.r.read(); ch0 == 'u' || ch0 == 'µ' || ch0 == 's' || ch0 == 'h' || ch0 == 'd' || ch0 == 'w' {
_, _ = buf.WriteRune(ch0)
return DURATION, pos, buf.String()
return DURATION_VAL, pos, buf.String()
} else if ch0 == 'm' {
_, _ = buf.WriteRune(ch0)
if ch1, _ := s.r.read(); ch1 == 's' {
@ -229,7 +229,7 @@ func (s *Scanner) scanNumber() (tok Token, pos Pos, lit string) {
} else {
s.r.unread()
}
return DURATION, pos, buf.String()
return DURATION_VAL, pos, buf.String()
}
s.r.unread()
}

View File

@ -85,27 +85,37 @@ func TestScanner_Scan(t *testing.T) {
{s: `10.3s`, tok: influxql.NUMBER, lit: `10.3`},
// Durations
{s: `10u`, tok: influxql.DURATION, lit: `10u`},
{s: `10µ`, tok: influxql.DURATION, lit: `10µ`},
{s: `10ms`, tok: influxql.DURATION, lit: `10ms`},
{s: `-1s`, tok: influxql.DURATION, lit: `-1s`},
{s: `10m`, tok: influxql.DURATION, lit: `10m`},
{s: `10h`, tok: influxql.DURATION, lit: `10h`},
{s: `10d`, tok: influxql.DURATION, lit: `10d`},
{s: `10w`, tok: influxql.DURATION, lit: `10w`},
{s: `10u`, tok: influxql.DURATION_VAL, lit: `10u`},
{s: `10µ`, tok: influxql.DURATION_VAL, lit: `10µ`},
{s: `10ms`, tok: influxql.DURATION_VAL, lit: `10ms`},
{s: `-1s`, tok: influxql.DURATION_VAL, lit: `-1s`},
{s: `10m`, tok: influxql.DURATION_VAL, lit: `10m`},
{s: `10h`, tok: influxql.DURATION_VAL, lit: `10h`},
{s: `10d`, tok: influxql.DURATION_VAL, lit: `10d`},
{s: `10w`, tok: influxql.DURATION_VAL, lit: `10w`},
{s: `10x`, tok: influxql.NUMBER, lit: `10`}, // non-duration unit
// Keywords
{s: `ALL`, tok: influxql.ALL},
{s: `ALTER`, tok: influxql.ALTER},
{s: `AS`, tok: influxql.AS},
{s: `ASC`, tok: influxql.ASC},
{s: `BY`, tok: influxql.BY},
{s: `CREATE`, tok: influxql.CREATE},
{s: `CONTINUOUS`, tok: influxql.CONTINUOUS},
{s: `DATABASE`, tok: influxql.DATABASE},
{s: `DEFAULT`, tok: influxql.DEFAULT},
{s: `DELETE`, tok: influxql.DELETE},
{s: `DESC`, tok: influxql.DESC},
{s: `DROP`, tok: influxql.DROP},
{s: `DURATION`, tok: influxql.DURATION},
{s: `EXISTS`, tok: influxql.EXISTS},
{s: `EXPLAIN`, tok: influxql.EXPLAIN},
{s: `FIELD`, tok: influxql.FIELD},
{s: `FROM`, tok: influxql.FROM},
{s: `GRANT`, tok: influxql.GRANT},
{s: `GROUP`, tok: influxql.GROUP},
{s: `IF`, tok: influxql.IF},
{s: `INNER`, tok: influxql.INNER},
{s: `INSERT`, tok: influxql.INSERT},
{s: `INTO`, tok: influxql.INTO},
@ -114,13 +124,25 @@ func TestScanner_Scan(t *testing.T) {
{s: `LIST`, tok: influxql.LIST},
{s: `MEASUREMENT`, tok: influxql.MEASUREMENT},
{s: `MEASUREMENTS`, tok: influxql.MEASUREMENTS},
{s: `ON`, tok: influxql.ON},
{s: `ORDER`, tok: influxql.ORDER},
{s: `PASSWORD`, tok: influxql.PASSWORD},
{s: `POLICY`, tok: influxql.POLICY},
{s: `PRIVILEGES`, tok: influxql.PRIVILEGES},
{s: `QUERIES`, tok: influxql.QUERIES},
{s: `QUERY`, tok: influxql.QUERY},
{s: `READ`, tok: influxql.READ},
{s: `RETENTION`, tok: influxql.RETENTION},
{s: `REVOKE`, tok: influxql.REVOKE},
{s: `SELECT`, tok: influxql.SELECT},
{s: `SERIES`, tok: influxql.SERIES},
{s: `TAG`, tok: influxql.TAG},
{s: `TO`, tok: influxql.TO},
{s: `USER`, tok: influxql.USER},
{s: `VALUES`, tok: influxql.VALUES},
{s: `WHERE`, tok: influxql.WHERE},
{s: `WITH`, tok: influxql.WITH},
{s: `WRITE`, tok: influxql.WRITE},
{s: `explain`, tok: influxql.EXPLAIN}, // case insensitive
}

View File

@ -15,14 +15,14 @@ const (
literal_beg
// Literals
IDENT // main
NUMBER // 12345.67
DURATION // 13h
STRING // "abc"
BADSTRING // "abc
BADESCAPE // \q
TRUE // true
FALSE // false
IDENT // main
NUMBER // 12345.67
DURATION_VAL // 13h
STRING // "abc"
BADSTRING // "abc
BADESCAPE // \q
TRUE // true
FALSE // false
literal_end
operator_beg
@ -50,18 +50,26 @@ const (
keyword_beg
// Keywords
ALL
ALTER
AS
ASC
BY
CREATE
CONTINUOUS
DATABASE
DEFAULT
DELETE
DESC
DROP
DURATION
EXISTS
EXPLAIN
FIELD
FROM
GRANT
GROUP
IF
INNER
INSERT
INTO
@ -70,14 +78,26 @@ const (
LIST
MEASUREMENT
MEASUREMENTS
ON
ORDER
PASSWORD
POLICY
PRIVILEGES
QUERIES
QUERY
READ
REPLICATION
RETENTION
REVOKE
SELECT
SERIES
TAG
TO
USER
VALUES
WHERE
WITH
WRITE
keyword_end
)
@ -86,11 +106,12 @@ var tokens = [...]string{
EOF: "EOF",
WS: "WS",
IDENT: "IDENT",
NUMBER: "NUMBER",
STRING: "STRING",
TRUE: "TRUE",
FALSE: "FALSE",
IDENT: "IDENT",
NUMBER: "NUMBER",
DURATION_VAL: "DURATION_VAL",
STRING: "STRING",
TRUE: "TRUE",
FALSE: "FALSE",
ADD: "+",
SUB: "-",
@ -112,18 +133,26 @@ var tokens = [...]string{
COMMA: ",",
SEMICOLON: ";",
ALL: "ALL",
ALTER: "ALTER",
AS: "AS",
ASC: "ASC",
BY: "BY",
CREATE: "CREATE",
CONTINUOUS: "CONTINUOUS",
DATABASE: "DATABASE",
DEFAULT: "DEFAULT",
DELETE: "DELETE",
DESC: "DESC",
DROP: "DROP",
DURATION: "DURATION",
EXISTS: "EXISTS",
EXPLAIN: "EXPLAIN",
FIELD: "FIELD",
FROM: "FROM",
GRANT: "GRANT",
GROUP: "GROUP",
IF: "IF",
INNER: "INNER",
INSERT: "INSERT",
INTO: "INTO",
@ -132,14 +161,26 @@ var tokens = [...]string{
LIST: "LIST",
MEASUREMENT: "MEASUREMENT",
MEASUREMENTS: "MEASUREMENTS",
ON: "ON",
ORDER: "ORDER",
PASSWORD: "PASSWORD",
POLICY: "POLICY",
PRIVILEGES: "PRIVILEGES",
QUERIES: "QUERIES",
QUERY: "QUERY",
READ: "READ",
REPLICATION: "REPLICATION",
RETENTION: "RETENTION",
REVOKE: "REVOKE",
SELECT: "SELECT",
SERIES: "SERIES",
TAG: "TAG",
TO: "TO",
USER: "USER",
VALUES: "VALUES",
WHERE: "WHERE",
WITH: "WITH",
WRITE: "WRITE",
}
var keywords map[string]Token

View File

@ -664,6 +664,32 @@ func (s *Server) Users() (a []*User) {
return a
}
// AdminUserExists returns whether at least 1 admin-level user exists.
func (s *Server) AdminUserExists() bool {
for _, u := range s.users {
if u.Admin {
return true
}
}
return false
}
// Authenticate returns an authenticated user by username. If any error occurs,
// or the authentication credentials are invalid, an error is returned.
func (s *Server) Authenticate(username, password string) (*User, error) {
s.mu.Lock()
defer s.mu.Unlock()
u := s.users[username]
if u == nil {
return nil, fmt.Errorf("user not found")
}
err := u.Authenticate(password)
if err != nil {
return nil, fmt.Errorf("invalid credentials")
}
return u, nil
}
// CreateUser creates a user on the server.
func (s *Server) CreateUser(username, password string, admin bool) error {
c := &createUserCommand{Username: username, Password: password, Admin: admin}

View File

@ -197,6 +197,48 @@ func TestServer_CreateUser(t *testing.T) {
} else if bcrypt.CompareHashAndPassword([]byte(u.Hash), []byte("pass")) != nil {
t.Fatal("invalid password")
}
// Verify that the authenticated user exists.
u, err := s.Authenticate("susy", "pass")
if err != nil {
t.Fatalf("error fetching authenticated user")
} else if u.Name != "susy" {
t.Fatalf("username mismatch: %v", u.Name)
} else if !u.Admin {
t.Fatalf("admin mismatch: %v", u.Admin)
} else if bcrypt.CompareHashAndPassword([]byte(u.Hash), []byte("pass")) != nil {
t.Fatal("invalid password")
}
}
// Ensure the server correctly detects when there is an admin user.
func TestServer_AdminUserExists(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
// A server should start up without any admin user.
if s.AdminUserExists() {
t.Fatalf("admin user unexpectedly exists at start-up")
}
// Create a non-admin user and verify Server agrees there is no admin user.
if err := s.CreateUser("bert", "pass", false); err != nil {
t.Fatal(err)
}
s.Restart()
if s.AdminUserExists() {
t.Fatalf("admin user unexpectedly exists")
}
// Next, create an admin user, and ensure the Server agrees there is an admin user.
if err := s.CreateUser("ernie", "pass", true); err != nil {
t.Fatal(err)
}
s.Restart()
if !s.AdminUserExists() {
t.Fatalf("admin user does not exist")
}
}
// Ensure the server returns an error when creating an user without a name.
@ -265,6 +307,27 @@ func TestServer_Users(t *testing.T) {
}
}
// Ensure the server does not return non-existent users
func TestServer_NonExistingUsers(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
// Create some users.
s.CreateUser("susy", "pass", false)
s.CreateUser("john", "pass2", false)
s.Restart()
// Ask for users that should not be returned.
u := s.User("bob")
if u != nil {
t.Fatalf("unexpected user found")
}
u, err := s.Authenticate("susy", "wrong_password")
if err == nil {
t.Fatalf("unexpected authenticated user found")
}
}
// Ensure the database can create a new retention policy.
func TestServer_CreateRetentionPolicy(t *testing.T) {
s := OpenServer(NewMessagingClient())