influxdb/meta/statement_executor.go

411 lines
14 KiB
Go

package meta
import (
"bytes"
"fmt"
"strconv"
"time"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/models"
)
// StatementExecutor translates InfluxQL queries to meta store methods.
type StatementExecutor struct {
Store interface {
Node(id uint64) (ni *NodeInfo, err error)
Nodes() ([]NodeInfo, error)
Peers() ([]string, error)
Leader() string
DeleteNode(nodeID uint64, force bool) error
Database(name string) (*DatabaseInfo, error)
Databases() ([]DatabaseInfo, error)
CreateDatabase(name string) (*DatabaseInfo, error)
DropDatabase(name string) error
DefaultRetentionPolicy(database string) (*RetentionPolicyInfo, error)
CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) (*RetentionPolicyInfo, error)
UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error
SetDefaultRetentionPolicy(database, name string) error
DropRetentionPolicy(database, name string) error
Users() ([]UserInfo, error)
CreateUser(name, password string, admin bool) (*UserInfo, error)
UpdateUser(name, password string) error
DropUser(name string) error
SetPrivilege(username, database string, p influxql.Privilege) error
SetAdminPrivilege(username string, admin bool) error
UserPrivileges(username string) (map[string]influxql.Privilege, error)
UserPrivilege(username, database string) (*influxql.Privilege, error)
CreateContinuousQuery(database, name, query string) error
DropContinuousQuery(database, name string) error
CreateSubscription(database, rp, name, mode string, destinations []string) error
DropSubscription(database, rp, name string) error
}
}
// ExecuteStatement executes stmt against the meta store as user.
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement) *influxql.Result {
switch stmt := stmt.(type) {
case *influxql.CreateDatabaseStatement:
return e.executeCreateDatabaseStatement(stmt)
case *influxql.DropDatabaseStatement:
return e.executeDropDatabaseStatement(stmt)
case *influxql.ShowDatabasesStatement:
return e.executeShowDatabasesStatement(stmt)
case *influxql.ShowGrantsForUserStatement:
return e.executeShowGrantsForUserStatement(stmt)
case *influxql.ShowServersStatement:
return e.executeShowServersStatement(stmt)
case *influxql.CreateUserStatement:
return e.executeCreateUserStatement(stmt)
case *influxql.SetPasswordUserStatement:
return e.executeSetPasswordUserStatement(stmt)
case *influxql.DropUserStatement:
return e.executeDropUserStatement(stmt)
case *influxql.ShowUsersStatement:
return e.executeShowUsersStatement(stmt)
case *influxql.GrantStatement:
return e.executeGrantStatement(stmt)
case *influxql.GrantAdminStatement:
return e.executeGrantAdminStatement(stmt)
case *influxql.RevokeStatement:
return e.executeRevokeStatement(stmt)
case *influxql.RevokeAdminStatement:
return e.executeRevokeAdminStatement(stmt)
case *influxql.CreateRetentionPolicyStatement:
return e.executeCreateRetentionPolicyStatement(stmt)
case *influxql.AlterRetentionPolicyStatement:
return e.executeAlterRetentionPolicyStatement(stmt)
case *influxql.DropRetentionPolicyStatement:
return e.executeDropRetentionPolicyStatement(stmt)
case *influxql.ShowRetentionPoliciesStatement:
return e.executeShowRetentionPoliciesStatement(stmt)
case *influxql.CreateContinuousQueryStatement:
return e.executeCreateContinuousQueryStatement(stmt)
case *influxql.DropContinuousQueryStatement:
return e.executeDropContinuousQueryStatement(stmt)
case *influxql.ShowContinuousQueriesStatement:
return e.executeShowContinuousQueriesStatement(stmt)
case *influxql.ShowShardsStatement:
return e.executeShowShardsStatement(stmt)
case *influxql.ShowStatsStatement:
return e.executeShowStatsStatement(stmt)
case *influxql.DropServerStatement:
return e.executeDropServerStatement(stmt)
case *influxql.CreateSubscriptionStatement:
return e.executeCreateSubscriptionStatement(stmt)
case *influxql.DropSubscriptionStatement:
return e.executeDropSubscriptionStatement(stmt)
case *influxql.ShowSubscriptionsStatement:
return e.executeShowSubscriptionsStatement(stmt)
default:
panic(fmt.Sprintf("unsupported statement type: %T", stmt))
}
}
func (e *StatementExecutor) executeCreateDatabaseStatement(q *influxql.CreateDatabaseStatement) *influxql.Result {
_, err := e.Store.CreateDatabase(q.Name)
if err == ErrDatabaseExists && q.IfNotExists {
err = nil
}
return &influxql.Result{Err: err}
}
func (e *StatementExecutor) executeDropDatabaseStatement(q *influxql.DropDatabaseStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.DropDatabase(q.Name)}
}
func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement) *influxql.Result {
dis, err := e.Store.Databases()
if err != nil {
return &influxql.Result{Err: err}
}
row := &models.Row{Name: "databases", Columns: []string{"name"}}
for _, di := range dis {
row.Values = append(row.Values, []interface{}{di.Name})
}
return &influxql.Result{Series: []*models.Row{row}}
}
func (e *StatementExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGrantsForUserStatement) *influxql.Result {
priv, err := e.Store.UserPrivileges(q.Name)
if err != nil {
return &influxql.Result{Err: err}
}
row := &models.Row{Columns: []string{"database", "privilege"}}
for d, p := range priv {
row.Values = append(row.Values, []interface{}{d, p.String()})
}
return &influxql.Result{Series: []*models.Row{row}}
}
func (e *StatementExecutor) executeShowServersStatement(q *influxql.ShowServersStatement) *influxql.Result {
nis, err := e.Store.Nodes()
if err != nil {
return &influxql.Result{Err: err}
}
peers, err := e.Store.Peers()
if err != nil {
return &influxql.Result{Err: err}
}
leader := e.Store.Leader()
row := &models.Row{Columns: []string{"id", "cluster_addr", "raft", "raft-leader"}}
for _, ni := range nis {
row.Values = append(row.Values, []interface{}{ni.ID, ni.Host, contains(peers, ni.Host), leader == ni.Host})
}
return &influxql.Result{Series: []*models.Row{row}}
}
func (e *StatementExecutor) executeDropServerStatement(q *influxql.DropServerStatement) *influxql.Result {
ni, err := e.Store.Node(q.NodeID)
if err != nil {
return &influxql.Result{Err: err}
}
if ni == nil {
return &influxql.Result{Err: ErrNodeNotFound}
}
err = e.Store.DeleteNode(q.NodeID, q.Force)
return &influxql.Result{Err: err}
}
func (e *StatementExecutor) executeCreateUserStatement(q *influxql.CreateUserStatement) *influxql.Result {
_, err := e.Store.CreateUser(q.Name, q.Password, q.Admin)
return &influxql.Result{Err: err}
}
func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordUserStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.UpdateUser(q.Name, q.Password)}
}
func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.DropUser(q.Name)}
}
func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement) *influxql.Result {
uis, err := e.Store.Users()
if err != nil {
return &influxql.Result{Err: err}
}
row := &models.Row{Columns: []string{"user", "admin"}}
for _, ui := range uis {
row.Values = append(row.Values, []interface{}{ui.Name, ui.Admin})
}
return &influxql.Result{Series: []*models.Row{row}}
}
func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.SetPrivilege(stmt.User, stmt.On, stmt.Privilege)}
}
func (e *StatementExecutor) executeGrantAdminStatement(stmt *influxql.GrantAdminStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.SetAdminPrivilege(stmt.User, true)}
}
func (e *StatementExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) *influxql.Result {
priv := influxql.NoPrivileges
// Revoking all privileges means there's no need to look at existing user privileges.
if stmt.Privilege != influxql.AllPrivileges {
p, err := e.Store.UserPrivilege(stmt.User, stmt.On)
if err != nil {
return &influxql.Result{Err: err}
}
// Bit clear (AND NOT) the user's privilege with the revoked privilege.
priv = *p &^ stmt.Privilege
}
return &influxql.Result{Err: e.Store.SetPrivilege(stmt.User, stmt.On, priv)}
}
func (e *StatementExecutor) executeRevokeAdminStatement(stmt *influxql.RevokeAdminStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.SetAdminPrivilege(stmt.User, false)}
}
func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement) *influxql.Result {
rpi := NewRetentionPolicyInfo(stmt.Name)
rpi.Duration = stmt.Duration
rpi.ReplicaN = stmt.Replication
// Create new retention policy.
_, err := e.Store.CreateRetentionPolicy(stmt.Database, rpi)
if err != nil {
return &influxql.Result{Err: err}
}
// If requested, set new policy as the default.
if stmt.Default {
err = e.Store.SetDefaultRetentionPolicy(stmt.Database, stmt.Name)
}
return &influxql.Result{Err: err}
}
func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) *influxql.Result {
rpu := &RetentionPolicyUpdate{
Duration: stmt.Duration,
ReplicaN: stmt.Replication,
}
// Update the retention policy.
err := e.Store.UpdateRetentionPolicy(stmt.Database, stmt.Name, rpu)
if err != nil {
return &influxql.Result{Err: err}
}
// If requested, set as default retention policy.
if stmt.Default {
err = e.Store.SetDefaultRetentionPolicy(stmt.Database, stmt.Name)
}
return &influxql.Result{Err: err}
}
func (e *StatementExecutor) executeDropRetentionPolicyStatement(q *influxql.DropRetentionPolicyStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.DropRetentionPolicy(q.Database, q.Name)}
}
func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) *influxql.Result {
di, err := e.Store.Database(q.Database)
if err != nil {
return &influxql.Result{Err: err}
} else if di == nil {
return &influxql.Result{Err: ErrDatabaseNotFound}
}
row := &models.Row{Columns: []string{"name", "duration", "replicaN", "default"}}
for _, rpi := range di.RetentionPolicies {
row.Values = append(row.Values, []interface{}{rpi.Name, rpi.Duration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name})
}
return &influxql.Result{Series: []*models.Row{row}}
}
func (e *StatementExecutor) executeCreateContinuousQueryStatement(q *influxql.CreateContinuousQueryStatement) *influxql.Result {
return &influxql.Result{
Err: e.Store.CreateContinuousQuery(q.Database, q.Name, q.String()),
}
}
func (e *StatementExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement) *influxql.Result {
return &influxql.Result{
Err: e.Store.DropContinuousQuery(q.Database, q.Name),
}
}
func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) *influxql.Result {
dis, err := e.Store.Databases()
if err != nil {
return &influxql.Result{Err: err}
}
rows := []*models.Row{}
for _, di := range dis {
row := &models.Row{Columns: []string{"name", "query"}, Name: di.Name}
for _, cqi := range di.ContinuousQueries {
row.Values = append(row.Values, []interface{}{cqi.Name, cqi.Query})
}
rows = append(rows, row)
}
return &influxql.Result{Series: rows}
}
func (e *StatementExecutor) executeCreateSubscriptionStatement(q *influxql.CreateSubscriptionStatement) *influxql.Result {
return &influxql.Result{
Err: e.Store.CreateSubscription(q.Database, q.RetentionPolicy, q.Name, q.Mode, q.Destinations),
}
}
func (e *StatementExecutor) executeDropSubscriptionStatement(q *influxql.DropSubscriptionStatement) *influxql.Result {
return &influxql.Result{
Err: e.Store.DropSubscription(q.Database, q.RetentionPolicy, q.Name),
}
}
func (e *StatementExecutor) executeShowSubscriptionsStatement(stmt *influxql.ShowSubscriptionsStatement) *influxql.Result {
dis, err := e.Store.Databases()
if err != nil {
return &influxql.Result{Err: err}
}
rows := []*models.Row{}
for _, di := range dis {
row := &models.Row{Columns: []string{"retention_policy", "name", "mode", "destinations"}, Name: di.Name}
for _, rpi := range di.RetentionPolicies {
for _, si := range rpi.Subscriptions {
row.Values = append(row.Values, []interface{}{rpi.Name, si.Name, si.Mode, si.Destinations})
}
}
if len(row.Values) > 0 {
rows = append(rows, row)
}
}
return &influxql.Result{Series: rows}
}
func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStatement) *influxql.Result {
dis, err := e.Store.Databases()
if err != nil {
return &influxql.Result{Err: err}
}
rows := []*models.Row{}
for _, di := range dis {
row := &models.Row{Columns: []string{"id", "database", "retention_policy", "shard_group", "start_time", "end_time", "expiry_time", "owners"}, Name: di.Name}
for _, rpi := range di.RetentionPolicies {
for _, sgi := range rpi.ShardGroups {
// Shards associated with deleted shard groups are effectively deleted.
// Don't list them.
if sgi.Deleted() {
continue
}
for _, si := range sgi.Shards {
ownerIDs := make([]uint64, len(si.Owners))
for i, owner := range si.Owners {
ownerIDs[i] = owner.NodeID
}
row.Values = append(row.Values, []interface{}{
si.ID,
di.Name,
rpi.Name,
sgi.ID,
sgi.StartTime.UTC().Format(time.RFC3339),
sgi.EndTime.UTC().Format(time.RFC3339),
sgi.EndTime.Add(rpi.Duration).UTC().Format(time.RFC3339),
joinUint64(ownerIDs),
})
}
}
}
rows = append(rows, row)
}
return &influxql.Result{Series: rows}
}
func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) *influxql.Result {
return &influxql.Result{Err: fmt.Errorf("SHOW STATS is not implemented yet")}
}
// joinUint64 returns a comma-delimited string of uint64 numbers.
func joinUint64(a []uint64) string {
var buf bytes.Buffer
for i, x := range a {
buf.WriteString(strconv.FormatUint(x, 10))
if i < len(a)-1 {
buf.WriteRune(',')
}
}
return buf.String()
}