Introduce a new dynamic language mechanism

The language is now defined in a way similar to many HTTP routers with
the left prefix being placed into a parse tree and then eventually
invoking a function to parse the arguments.

This allows dynamically adding additional components to the parse tree
for either query language extensions or enterprise.
pull/8557/head
Jonathan A. Sternberg 2017-07-02 16:09:57 -05:00
parent 8550fabf89
commit 3e2501cbd1
4 changed files with 253 additions and 205 deletions

217
influxql/parse_tree.go Normal file
View File

@ -0,0 +1,217 @@
package influxql
import "fmt"
var Language = &ParseTree{}
type ParseTree struct {
Handlers map[Token]func(*Parser) (Statement, error)
Tokens map[Token]*ParseTree
Keys []string
}
// With passes the current parse tree to a function to allow nested functions.
func (t *ParseTree) With(fn func(*ParseTree)) {
fn(t)
}
// Group groups together a set of related handlers with a common token prefix.
func (t *ParseTree) Group(tokens ...Token) *ParseTree {
for _, tok := range tokens {
// Look for the parse tree for this token.
if subtree, ok := t.Tokens[tok]; ok {
t = subtree
continue
}
// No subtree exists yet. Verify that we don't have a conflicting
// statement.
if _, conflict := t.Handlers[tok]; conflict {
panic(fmt.Sprintf("conflict for token %s", tok))
}
// Create the new parse tree and register it inside of this one for
// later reference.
newT := &ParseTree{}
if t.Tokens == nil {
t.Tokens = make(map[Token]*ParseTree)
}
t.Tokens[tok] = newT
t.Keys = append(t.Keys, tok.String())
t = newT
}
return t
}
// Handle registers a handler to be invoked when seeing the given token.
func (t *ParseTree) Handle(tok Token, fn func(*Parser) (Statement, error)) {
// Verify that there is no conflict for this token in this parse tree.
if _, conflict := t.Tokens[tok]; conflict {
panic(fmt.Sprintf("conflict for token %s", tok))
}
if _, conflict := t.Handlers[tok]; conflict {
panic(fmt.Sprintf("conflict for token %s", tok))
}
if t.Handlers == nil {
t.Handlers = make(map[Token]func(*Parser) (Statement, error))
}
t.Handlers[tok] = fn
t.Keys = append(t.Keys, tok.String())
}
// Parse parses a statement using the language defined in the parse tree.
func (t *ParseTree) Parse(p *Parser) (Statement, error) {
for {
tok, pos, lit := p.scanIgnoreWhitespace()
if subtree, ok := t.Tokens[tok]; ok {
t = subtree
continue
}
if stmt, ok := t.Handlers[tok]; ok {
return stmt(p)
}
// There were no registered handlers. Return the valid tokens in the order they were added.
return nil, newParseError(tokstr(tok, lit), t.Keys, pos)
}
}
func (t *ParseTree) Clone() *ParseTree {
newT := &ParseTree{}
if t.Handlers != nil {
newT.Handlers = make(map[Token]func(*Parser) (Statement, error), len(t.Handlers))
for tok, handler := range t.Handlers {
newT.Handlers[tok] = handler
}
}
if t.Tokens != nil {
newT.Tokens = make(map[Token]*ParseTree, len(t.Tokens))
for tok, subtree := range t.Tokens {
newT.Tokens[tok] = subtree.Clone()
}
}
return newT
}
func init() {
Language.Handle(SELECT, func(p *Parser) (Statement, error) {
return p.parseSelectStatement(targetNotRequired)
})
Language.Handle(DELETE, func(p *Parser) (Statement, error) {
return p.parseDeleteStatement()
})
Language.Group(SHOW).With(func(show *ParseTree) {
show.Group(CONTINUOUS).Handle(QUERIES, func(p *Parser) (Statement, error) {
return p.parseShowContinuousQueriesStatement()
})
show.Handle(DATABASES, func(p *Parser) (Statement, error) {
return p.parseShowDatabasesStatement()
})
show.Handle(DIAGNOSTICS, func(p *Parser) (Statement, error) {
return p.parseShowDiagnosticsStatement()
})
show.Group(FIELD).Handle(KEYS, func(p *Parser) (Statement, error) {
return p.parseShowFieldKeysStatement()
})
show.Group(GRANTS).Handle(FOR, func(p *Parser) (Statement, error) {
return p.parseGrantsForUserStatement()
})
show.Handle(MEASUREMENTS, func(p *Parser) (Statement, error) {
return p.parseShowMeasurementsStatement()
})
show.Handle(QUERIES, func(p *Parser) (Statement, error) {
return p.parseShowQueriesStatement()
})
show.Group(RETENTION).Handle(POLICIES, func(p *Parser) (Statement, error) {
return p.parseShowRetentionPoliciesStatement()
})
show.Handle(SERIES, func(p *Parser) (Statement, error) {
return p.parseShowSeriesStatement()
})
show.Group(SHARD).Handle(GROUPS, func(p *Parser) (Statement, error) {
return p.parseShowShardGroupsStatement()
})
show.Handle(SHARDS, func(p *Parser) (Statement, error) {
return p.parseShowShardsStatement()
})
show.Handle(STATS, func(p *Parser) (Statement, error) {
return p.parseShowStatsStatement()
})
show.Handle(SUBSCRIPTIONS, func(p *Parser) (Statement, error) {
return p.parseShowSubscriptionsStatement()
})
show.Group(TAG).With(func(tag *ParseTree) {
tag.Handle(KEYS, func(p *Parser) (Statement, error) {
return p.parseShowTagKeysStatement()
})
tag.Handle(VALUES, func(p *Parser) (Statement, error) {
return p.parseShowTagValuesStatement()
})
})
show.Handle(USERS, func(p *Parser) (Statement, error) {
return p.parseShowUsersStatement()
})
})
Language.Group(CREATE).With(func(create *ParseTree) {
create.Group(CONTINUOUS).Handle(QUERY, func(p *Parser) (Statement, error) {
return p.parseCreateContinuousQueryStatement()
})
create.Handle(DATABASE, func(p *Parser) (Statement, error) {
return p.parseCreateDatabaseStatement()
})
create.Handle(USER, func(p *Parser) (Statement, error) {
return p.parseCreateUserStatement()
})
create.Group(RETENTION).Handle(POLICY, func(p *Parser) (Statement, error) {
return p.parseCreateRetentionPolicyStatement()
})
create.Handle(SUBSCRIPTION, func(p *Parser) (Statement, error) {
return p.parseCreateSubscriptionStatement()
})
})
Language.Group(DROP).With(func(drop *ParseTree) {
drop.Group(CONTINUOUS).Handle(QUERY, func(p *Parser) (Statement, error) {
return p.parseDropContinuousQueryStatement()
})
drop.Handle(DATABASE, func(p *Parser) (Statement, error) {
return p.parseDropDatabaseStatement()
})
drop.Handle(MEASUREMENT, func(p *Parser) (Statement, error) {
return p.parseDropMeasurementStatement()
})
drop.Group(RETENTION).Handle(POLICY, func(p *Parser) (Statement, error) {
return p.parseDropRetentionPolicyStatement()
})
drop.Handle(SERIES, func(p *Parser) (Statement, error) {
return p.parseDropSeriesStatement()
})
drop.Handle(SHARD, func(p *Parser) (Statement, error) {
return p.parseDropShardStatement()
})
drop.Handle(SUBSCRIPTION, func(p *Parser) (Statement, error) {
return p.parseDropSubscriptionStatement()
})
drop.Handle(USER, func(p *Parser) (Statement, error) {
return p.parseDropUserStatement()
})
})
Language.Handle(GRANT, func(p *Parser) (Statement, error) {
return p.parseGrantStatement()
})
Language.Handle(REVOKE, func(p *Parser) (Statement, error) {
return p.parseRevokeStatement()
})
Language.Group(ALTER, RETENTION).Handle(POLICY, func(p *Parser) (Statement, error) {
return p.parseAlterRetentionPolicyStatement()
})
Language.Group(SET, PASSWORD).Handle(FOR, func(p *Parser) (Statement, error) {
return p.parseSetPasswordUserStatement()
})
Language.Group(KILL).Handle(QUERY, func(p *Parser) (Statement, error) {
return p.parseKillQueryStatement()
})
}

View File

@ -0,0 +1,32 @@
package influxql_test
import (
"reflect"
"strings"
"testing"
"github.com/influxdata/influxdb/influxql"
)
func TestParseTree_Clone(t *testing.T) {
// Clone the default language parse tree and add a new syntax node.
language := influxql.Language.Clone()
language.Group(influxql.CREATE).Handle(influxql.STATS, func(p *influxql.Parser) (influxql.Statement, error) {
return &influxql.ShowStatsStatement{}, nil
})
// Create a parser with CREATE STATS and parse the statement.
parser := influxql.NewParser(strings.NewReader(`CREATE STATS`))
stmt, err := language.Parse(parser)
if err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !reflect.DeepEqual(stmt, &influxql.ShowStatsStatement{}) {
t.Fatalf("unexpected statement returned from parser: %s", stmt)
}
// Recreate the parser and try parsing with the original parsing. This should fail.
parser = influxql.NewParser(strings.NewReader(`CREATE STATS`))
if _, err := parser.ParseStatement(); err == nil {
t.Fatal("expected error")
}
}

View File

@ -7,7 +7,6 @@ import (
"io"
"math"
"regexp"
"sort"
"strconv"
"strings"
"time"
@ -93,175 +92,7 @@ func (p *Parser) ParseQuery() (*Query, error) {
// ParseStatement parses an InfluxQL string and returns a Statement AST object.
func (p *Parser) ParseStatement() (Statement, error) {
// Inspect the first token.
tok, pos, lit := p.scanIgnoreWhitespace()
switch tok {
case SELECT:
return p.parseSelectStatement(targetNotRequired)
case DELETE:
return p.parseDeleteStatement()
case SHOW:
return p.parseShowStatement()
case CREATE:
return p.parseCreateStatement()
case DROP:
return p.parseDropStatement()
case GRANT:
return p.parseGrantStatement()
case REVOKE:
return p.parseRevokeStatement()
case ALTER:
return p.parseAlterStatement()
case SET:
return p.parseSetPasswordUserStatement()
case KILL:
return p.parseKillQueryStatement()
default:
return nil, newParseError(tokstr(tok, lit), []string{"SELECT", "DELETE", "SHOW", "CREATE", "DROP", "GRANT", "REVOKE", "ALTER", "SET", "KILL"}, pos)
}
}
// parseShowStatement parses a string and returns a list statement.
// This function assumes the SHOW token has already been consumed.
func (p *Parser) parseShowStatement() (Statement, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
switch tok {
case CONTINUOUS:
return p.parseShowContinuousQueriesStatement()
case GRANTS:
return p.parseGrantsForUserStatement()
case DATABASES:
return p.parseShowDatabasesStatement()
case FIELD:
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == KEYS {
return p.parseShowFieldKeysStatement()
}
return nil, newParseError(tokstr(tok, lit), []string{"KEYS"}, pos)
case MEASUREMENTS:
return p.parseShowMeasurementsStatement()
case QUERIES:
return p.parseShowQueriesStatement()
case RETENTION:
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == POLICIES {
return p.parseShowRetentionPoliciesStatement()
}
return nil, newParseError(tokstr(tok, lit), []string{"POLICIES"}, pos)
case SERIES:
return p.parseShowSeriesStatement()
case SHARD:
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == GROUPS {
return p.parseShowShardGroupsStatement()
}
return nil, newParseError(tokstr(tok, lit), []string{"GROUPS"}, pos)
case SHARDS:
return p.parseShowShardsStatement()
case STATS:
return p.parseShowStatsStatement()
case DIAGNOSTICS:
return p.parseShowDiagnosticsStatement()
case TAG:
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == KEYS {
return p.parseShowTagKeysStatement()
} else if tok == VALUES {
return p.parseShowTagValuesStatement()
}
return nil, newParseError(tokstr(tok, lit), []string{"KEYS", "VALUES"}, pos)
case USERS:
return p.parseShowUsersStatement()
case SUBSCRIPTIONS:
return p.parseShowSubscriptionsStatement()
}
showQueryKeywords := []string{
"CONTINUOUS",
"DATABASES",
"FIELD",
"GRANTS",
"MEASUREMENTS",
"QUERIES",
"RETENTION",
"SERIES",
"TAG",
"USERS",
"STATS",
"DIAGNOSTICS",
"SHARD",
"SHARDS",
"SUBSCRIPTIONS",
}
sort.Strings(showQueryKeywords)
return nil, newParseError(tokstr(tok, lit), showQueryKeywords, pos)
}
// parseCreateStatement parses a string and returns a create statement.
// This function assumes the CREATE token has already been consumed.
func (p *Parser) parseCreateStatement() (Statement, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == CONTINUOUS {
return p.parseCreateContinuousQueryStatement()
} else if tok == DATABASE {
return p.parseCreateDatabaseStatement()
} else if tok == USER {
return p.parseCreateUserStatement()
} else if tok == RETENTION {
tok, pos, lit = p.scanIgnoreWhitespace()
if tok != POLICY {
return nil, newParseError(tokstr(tok, lit), []string{"POLICY"}, pos)
}
return p.parseCreateRetentionPolicyStatement()
} else if tok == SUBSCRIPTION {
return p.parseCreateSubscriptionStatement()
}
return nil, newParseError(tokstr(tok, lit), []string{"CONTINUOUS", "DATABASE", "USER", "RETENTION", "SUBSCRIPTION"}, pos)
}
// parseDropStatement parses a string and returns a drop statement.
// This function assumes the DROP token has already been consumed.
func (p *Parser) parseDropStatement() (Statement, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
switch tok {
case CONTINUOUS:
return p.parseDropContinuousQueryStatement()
case DATABASE:
return p.parseDropDatabaseStatement()
case MEASUREMENT:
return p.parseDropMeasurementStatement()
case RETENTION:
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != POLICY {
return nil, newParseError(tokstr(tok, lit), []string{"POLICY"}, pos)
}
return p.parseDropRetentionPolicyStatement()
case SERIES:
return p.parseDropSeriesStatement()
case SHARD:
return p.parseDropShardStatement()
case SUBSCRIPTION:
return p.parseDropSubscriptionStatement()
case USER:
return p.parseDropUserStatement()
default:
return nil, newParseError(tokstr(tok, lit), []string{"CONTINUOUS", "MEASUREMENT", "RETENTION", "SERIES", "SHARD", "SUBSCRIPTION", "USER"}, pos)
}
}
// parseAlterStatement parses a string and returns an alter statement.
// This function assumes the ALTER token has already been consumed.
func (p *Parser) parseAlterStatement() (Statement, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == RETENTION {
if tok, pos, lit = p.scanIgnoreWhitespace(); tok != POLICY {
return nil, newParseError(tokstr(tok, lit), []string{"POLICY"}, pos)
}
return p.parseAlterRetentionPolicyStatement()
}
return nil, newParseError(tokstr(tok, lit), []string{"RETENTION"}, pos)
return Language.Parse(p)
}
// parseSetPasswordUserStatement parses a string and returns a set statement.
@ -269,11 +100,6 @@ func (p *Parser) parseAlterStatement() (Statement, error) {
func (p *Parser) parseSetPasswordUserStatement() (*SetPasswordUserStatement, error) {
stmt := &SetPasswordUserStatement{}
// Consume the required PASSWORD FOR tokens.
if err := p.parseTokens([]Token{PASSWORD, FOR}); err != nil {
return nil, err
}
// Parse username
ident, err := p.parseIdent()
@ -299,10 +125,6 @@ func (p *Parser) parseSetPasswordUserStatement() (*SetPasswordUserStatement, err
// parseKillQueryStatement parses a string and returns a kill statement.
// This function assumes the KILL token has already been consumed.
func (p *Parser) parseKillQueryStatement() (*KillQueryStatement, error) {
if err := p.parseTokens([]Token{QUERY}); err != nil {
return nil, err
}
qid, err := p.parseUInt64()
if err != nil {
return nil, err
@ -1515,14 +1337,7 @@ func (p *Parser) parseDropShardStatement() (*DropShardStatement, error) {
// parseShowContinuousQueriesStatement parses a string and returns a ShowContinuousQueriesStatement.
// This function assumes the "SHOW CONTINUOUS" tokens have already been consumed.
func (p *Parser) parseShowContinuousQueriesStatement() (*ShowContinuousQueriesStatement, error) {
stmt := &ShowContinuousQueriesStatement{}
// Expect a "QUERIES" token.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != QUERIES {
return nil, newParseError(tokstr(tok, lit), []string{"QUERIES"}, pos)
}
return stmt, nil
return &ShowContinuousQueriesStatement{}, nil
}
// parseGrantsForUserStatement parses a string and returns a ShowGrantsForUserStatement.
@ -1530,11 +1345,6 @@ func (p *Parser) parseShowContinuousQueriesStatement() (*ShowContinuousQueriesSt
func (p *Parser) parseGrantsForUserStatement() (*ShowGrantsForUserStatement, error) {
stmt := &ShowGrantsForUserStatement{}
// Expect a "FOR" token.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != FOR {
return nil, newParseError(tokstr(tok, lit), []string{"FOR"}, pos)
}
// Parse the name of the user to be displayed.
lit, err := p.parseIdent()
if err != nil {
@ -1548,8 +1358,7 @@ func (p *Parser) parseGrantsForUserStatement() (*ShowGrantsForUserStatement, err
// parseShowDatabasesStatement parses a string and returns a ShowDatabasesStatement.
// This function assumes the "SHOW DATABASE" tokens have already been consumed.
func (p *Parser) parseShowDatabasesStatement() (*ShowDatabasesStatement, error) {
stmt := &ShowDatabasesStatement{}
return stmt, nil
return &ShowDatabasesStatement{}, nil
}
// parseCreateContinuousQueriesStatement parses a string and returns a CreateContinuousQueryStatement.
@ -1557,11 +1366,6 @@ func (p *Parser) parseShowDatabasesStatement() (*ShowDatabasesStatement, error)
func (p *Parser) parseCreateContinuousQueryStatement() (*CreateContinuousQueryStatement, error) {
stmt := &CreateContinuousQueryStatement{}
// Expect a "QUERY" token.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != QUERY {
return nil, newParseError(tokstr(tok, lit), []string{"QUERY"}, pos)
}
// Read the id of the query to create.
ident, err := p.parseIdent()
if err != nil {
@ -1880,11 +1684,6 @@ func (p *Parser) parseShowDiagnosticsStatement() (*ShowDiagnosticsStatement, err
func (p *Parser) parseDropContinuousQueryStatement() (*DropContinuousQueryStatement, error) {
stmt := &DropContinuousQueryStatement{}
// Expect a "QUERY" token.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != QUERY {
return nil, newParseError(tokstr(tok, lit), []string{"QUERY"}, pos)
}
// Read the id of the query to drop.
ident, err := p.parseIdent()
if err != nil {

View File

@ -2716,7 +2716,7 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `CREATE CONTINUOUS QUERY`, err: `found EOF, expected identifier at line 1, char 25`},
{s: `CREATE CONTINUOUS QUERY cq ON db RESAMPLE FOR 5s BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(10s) END`, err: `FOR duration must be >= GROUP BY time duration: must be a minimum of 10s, got 5s`},
{s: `CREATE CONTINUOUS QUERY cq ON db RESAMPLE EVERY 10s FOR 5s BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(5s) END`, err: `FOR duration must be >= GROUP BY time duration: must be a minimum of 10s, got 5s`},
{s: `DROP FOO`, err: `found FOO, expected CONTINUOUS, MEASUREMENT, RETENTION, SERIES, SHARD, SUBSCRIPTION, USER at line 1, char 6`},
{s: `DROP FOO`, err: `found FOO, expected CONTINUOUS, DATABASE, MEASUREMENT, RETENTION, SERIES, SHARD, SUBSCRIPTION, USER at line 1, char 6`},
{s: `CREATE FOO`, err: `found FOO, expected CONTINUOUS, DATABASE, USER, RETENTION, SUBSCRIPTION at line 1, char 8`},
{s: `CREATE DATABASE`, err: `found EOF, expected identifier at line 1, char 17`},
{s: `CREATE DATABASE "testdb" WITH`, err: `found EOF, expected DURATION, NAME, REPLICATION, SHARD at line 1, char 31`},