2015-05-28 04:06:09 +00:00
|
|
|
package meta
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"github.com/influxdb/influxdb/influxql"
|
|
|
|
)
|
|
|
|
|
|
|
|
// StatementExecutor translates InfluxQL queries to meta store methods.
|
|
|
|
type StatementExecutor struct {
|
|
|
|
Store interface {
|
|
|
|
Nodes() ([]NodeInfo, error)
|
|
|
|
|
|
|
|
Database(name string) (*DatabaseInfo, error)
|
|
|
|
Databases() ([]DatabaseInfo, error)
|
|
|
|
CreateDatabase(name string) (*DatabaseInfo, error)
|
|
|
|
DropDatabase(name string) error
|
|
|
|
|
|
|
|
DefaultRetentionPolicy(database string) (*RetentionPolicyInfo, error)
|
|
|
|
CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) (*RetentionPolicyInfo, error)
|
|
|
|
UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error
|
|
|
|
SetDefaultRetentionPolicy(database, name string) error
|
|
|
|
DropRetentionPolicy(database, name string) error
|
|
|
|
|
|
|
|
Users() ([]UserInfo, error)
|
|
|
|
CreateUser(name, password string, admin bool) (*UserInfo, error)
|
|
|
|
UpdateUser(name, password string) error
|
|
|
|
DropUser(name string) error
|
|
|
|
SetPrivilege(username, database string, p influxql.Privilege) error
|
2015-05-24 07:27:02 +00:00
|
|
|
UserPrivileges(username string) (map[string]influxql.Privilege, error)
|
2015-05-28 04:06:09 +00:00
|
|
|
|
|
|
|
CreateContinuousQuery(database, name, query string) error
|
|
|
|
DropContinuousQuery(database, name string) error
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// ExecuteStatement executes stmt against the meta store as user.
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
switch stmt := stmt.(type) {
|
|
|
|
case *influxql.CreateDatabaseStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeCreateDatabaseStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.DropDatabaseStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeDropDatabaseStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.ShowDatabasesStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeShowDatabasesStatement(stmt)
|
2015-05-24 07:27:02 +00:00
|
|
|
case *influxql.ShowGrantsForUserStatement:
|
|
|
|
return e.executeShowGrantsForUserStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.ShowServersStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeShowServersStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.CreateUserStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeCreateUserStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.SetPasswordUserStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeSetPasswordUserStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.DropUserStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeDropUserStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.ShowUsersStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeShowUsersStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.GrantStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeGrantStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.RevokeStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeRevokeStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.CreateRetentionPolicyStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeCreateRetentionPolicyStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.AlterRetentionPolicyStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeAlterRetentionPolicyStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.DropRetentionPolicyStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeDropRetentionPolicyStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.ShowRetentionPoliciesStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeShowRetentionPoliciesStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.CreateContinuousQueryStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeCreateContinuousQueryStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.DropContinuousQueryStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeDropContinuousQueryStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
case *influxql.ShowContinuousQueriesStatement:
|
2015-05-30 20:00:46 +00:00
|
|
|
return e.executeShowContinuousQueriesStatement(stmt)
|
2015-05-28 04:06:09 +00:00
|
|
|
default:
|
|
|
|
panic(fmt.Sprintf("unsupported statement type: %T", stmt))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeCreateDatabaseStatement(q *influxql.CreateDatabaseStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
_, err := e.Store.CreateDatabase(q.Name)
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeDropDatabaseStatement(q *influxql.DropDatabaseStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
return &influxql.Result{Err: e.Store.DropDatabase(q.Name)}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
dis, err := e.Store.Databases()
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
|
|
|
row := &influxql.Row{Name: "databases", Columns: []string{"name"}}
|
|
|
|
for _, di := range dis {
|
|
|
|
row.Values = append(row.Values, []interface{}{di.Name})
|
|
|
|
}
|
|
|
|
return &influxql.Result{Series: []*influxql.Row{row}}
|
|
|
|
}
|
|
|
|
|
2015-05-24 07:27:02 +00:00
|
|
|
func (e *StatementExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrantsForUserStatement) *influxql.Result {
|
|
|
|
priv, err := e.Store.UserPrivileges(q.Name)
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
|
|
|
row := &influxql.Row{Columns: []string{"database", "privilege"}}
|
|
|
|
for d, p := range priv {
|
|
|
|
row.Values = append(row.Values, []interface{}{d, p.String()})
|
|
|
|
}
|
|
|
|
return &influxql.Result{Series: []*influxql.Row{row}}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeShowServersStatement(q *influxql.ShowServersStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
nis, err := e.Store.Nodes()
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
|
|
|
row := &influxql.Row{Columns: []string{"id", "url"}}
|
|
|
|
for _, ni := range nis {
|
|
|
|
row.Values = append(row.Values, []interface{}{ni.ID, "http://" + ni.Host})
|
|
|
|
}
|
|
|
|
return &influxql.Result{Series: []*influxql.Row{row}}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeCreateUserStatement(q *influxql.CreateUserStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
admin := false
|
|
|
|
if q.Privilege != nil {
|
|
|
|
admin = (*q.Privilege == influxql.AllPrivileges)
|
|
|
|
}
|
|
|
|
|
|
|
|
_, err := e.Store.CreateUser(q.Name, q.Password, admin)
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordUserStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
return &influxql.Result{Err: e.Store.UpdateUser(q.Name, q.Password)}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
return &influxql.Result{Err: e.Store.DropUser(q.Name)}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
uis, err := e.Store.Users()
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
|
|
|
row := &influxql.Row{Columns: []string{"user", "admin"}}
|
2015-05-30 20:00:46 +00:00
|
|
|
for _, ui := range uis {
|
|
|
|
row.Values = append(row.Values, []interface{}{ui.Name, ui.Admin})
|
2015-05-28 04:06:09 +00:00
|
|
|
}
|
|
|
|
return &influxql.Result{Series: []*influxql.Row{row}}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
return &influxql.Result{Err: e.Store.SetPrivilege(stmt.User, stmt.On, stmt.Privilege)}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
return &influxql.Result{Err: e.Store.SetPrivilege(stmt.User, stmt.On, influxql.NoPrivileges)}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
rpi := NewRetentionPolicyInfo(stmt.Name)
|
|
|
|
rpi.Duration = stmt.Duration
|
|
|
|
rpi.ReplicaN = stmt.Replication
|
|
|
|
|
|
|
|
// Create new retention policy.
|
|
|
|
_, err := e.Store.CreateRetentionPolicy(stmt.Database, rpi)
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
|
|
|
// If requested, set new policy as the default.
|
|
|
|
if stmt.Default {
|
|
|
|
err = e.Store.SetDefaultRetentionPolicy(stmt.Database, stmt.Name)
|
|
|
|
}
|
|
|
|
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
rpu := &RetentionPolicyUpdate{
|
|
|
|
Duration: stmt.Duration,
|
|
|
|
ReplicaN: stmt.Replication,
|
|
|
|
}
|
|
|
|
|
|
|
|
// Update the retention policy.
|
|
|
|
err := e.Store.UpdateRetentionPolicy(stmt.Database, stmt.Name, rpu)
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
|
|
|
// If requested, set as default retention policy.
|
|
|
|
if stmt.Default {
|
|
|
|
err = e.Store.SetDefaultRetentionPolicy(stmt.Database, stmt.Name)
|
|
|
|
}
|
|
|
|
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeDropRetentionPolicyStatement(q *influxql.DropRetentionPolicyStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
return &influxql.Result{Err: e.Store.DropRetentionPolicy(q.Database, q.Name)}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
di, err := e.Store.Database(q.Database)
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
} else if di == nil {
|
|
|
|
return &influxql.Result{Err: ErrDatabaseNotFound}
|
|
|
|
}
|
|
|
|
|
|
|
|
row := &influxql.Row{Columns: []string{"name", "duration", "replicaN", "default"}}
|
|
|
|
for _, rpi := range di.RetentionPolicies {
|
|
|
|
row.Values = append(row.Values, []interface{}{rpi.Name, rpi.Duration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name})
|
|
|
|
}
|
|
|
|
return &influxql.Result{Series: []*influxql.Row{row}}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeCreateContinuousQueryStatement(q *influxql.CreateContinuousQueryStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
return &influxql.Result{
|
|
|
|
Err: e.Store.CreateContinuousQuery(q.Database, q.Name, q.Source.String()),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
return &influxql.Result{
|
|
|
|
Err: e.Store.DropContinuousQuery(q.Database, q.Name),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) *influxql.Result {
|
2015-05-28 04:06:09 +00:00
|
|
|
dis, err := e.Store.Databases()
|
|
|
|
if err != nil {
|
|
|
|
return &influxql.Result{Err: err}
|
|
|
|
}
|
|
|
|
|
|
|
|
rows := []*influxql.Row{}
|
|
|
|
for _, di := range dis {
|
|
|
|
row := &influxql.Row{Columns: []string{"name", "query"}, Name: di.Name}
|
|
|
|
for _, cqi := range di.ContinuousQueries {
|
|
|
|
row.Values = append(row.Values, []interface{}{cqi.Name, cqi.Query})
|
|
|
|
}
|
|
|
|
rows = append(rows, row)
|
|
|
|
}
|
|
|
|
return &influxql.Result{Series: rows}
|
|
|
|
}
|