Add LIST/DROP SERIES and LIST/DROP CONTINUOUS QUERIES.

pull/1159/head
Ben Johnson 2014-11-22 16:33:21 -07:00
parent 6ef8d4634a
commit 91f16db3af
6 changed files with 303 additions and 53 deletions

View File

@ -20,10 +20,16 @@ type Node interface {
node()
}
func (_ *Query) node() {}
func (_ Statements) node() {}
func (_ *SelectStatement) node() {}
func (_ *DeleteStatement) node() {}
func (_ *Query) node() {}
func (_ Statements) node() {}
func (_ *SelectStatement) node() {}
func (_ *DeleteStatement) node() {}
func (_ *ListSeriesStatement) node() {}
func (_ *DropSeriesStatement) node() {}
func (_ *ListContinuousQueriesStatement) node() {}
func (_ *DropContinuousQueryStatement) node() {}
func (_ Fields) node() {}
func (_ *Field) node() {}
func (_ Dimensions) node() {}
@ -54,8 +60,12 @@ type Statement interface {
stmt()
}
func (_ *SelectStatement) stmt() {}
func (_ *DeleteStatement) stmt() {}
func (_ *SelectStatement) stmt() {}
func (_ *DeleteStatement) stmt() {}
func (_ *ListSeriesStatement) stmt() {}
func (_ *DropSeriesStatement) stmt() {}
func (_ *ListContinuousQueriesStatement) stmt() {}
func (_ *DropContinuousQueryStatement) stmt() {}
// Expr represents an expression that can be evaluated to a value.
type Expr interface {
@ -113,6 +123,22 @@ type DeleteStatement struct {
Condition Expr
}
// ListSeriesStatement represents a command for listing series in the database.
type ListSeriesStatement struct{}
// DropSeriesStatement represents a command for removing a series from the database.
type DropSeriesStatement struct {
Name string
}
// ListContinuousQueriesStatement represents a command for listing continuous queries.
type ListContinuousQueriesStatement struct{}
// ListContinuousQueriesStatement represents a command for removing a continuous query.
type DropContinuousQueryStatement struct {
ID int
}
// Fields represents a list of fields.
type Fields []*Field

View File

@ -19,13 +19,51 @@ func NewParser(r io.Reader) *Parser {
return &Parser{s: newBufScanner(r)}
}
// ParseQuery parses an InfluxQL string and returns a Query AST object.
func (p *Parser) ParseQuery() (*Query, error) {
var statements Statements
for {
// Read statements until we reach the end.
if tok, _, _ := p.scanIgnoreWhitespace(); tok == EOF {
break
}
p.unscan()
// Read the next statement.
s, err := p.ParseStatement()
if err != nil {
return nil, err
}
statements = append(statements, s)
}
return &Query{Statements: statements}, nil
}
// ParseStatement parses an InfluxQL string and returns a Statement AST object.
func (p *Parser) ParseStatement() (Statement, error) {
// Inspect the first token.
tok, pos, lit := p.scan()
tok, pos, lit := p.scanIgnoreWhitespace()
switch tok {
case SELECT:
return p.parseSelectStatement()
case DELETE:
return p.parseDeleteStatement()
case LIST:
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == SERIES {
return p.parseListSeriesStatement()
} else if tok == CONTINUOUS {
return p.parseListContinuousQueriesStatement()
} else {
return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS"}, pos)
}
case DROP:
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == SERIES {
return p.parseDropSeriesStatement()
} else if tok == CONTINUOUS {
return p.parseDropContinuousQueryStatement()
} else {
return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS"}, pos)
}
default:
return nil, newParseError(tokstr(tok, lit), []string{"SELECT"}, pos)
}
@ -78,6 +116,116 @@ func (p *Parser) parseSelectStatement() (*SelectStatement, error) {
}
stmt.Ascending = ascending
// Expect a semicolon or EOF at the end
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != SEMICOLON && tok != EOF {
return nil, newParseError(tokstr(tok, lit), []string{";", "EOF"}, pos)
}
return stmt, nil
}
// parseDeleteStatement parses a delete string and returns a DeleteStatement.
// This function assumes the DELETE token has already been consumed.
func (p *Parser) parseDeleteStatement() (*DeleteStatement, error) {
stmt := &DeleteStatement{}
// Parse source: "FROM IDENT".
source, err := p.parseSource()
if err != nil {
return nil, err
}
stmt.Source = source
// Parse condition: "WHERE EXPR".
condition, err := p.parseCondition()
if err != nil {
return nil, err
}
stmt.Condition = condition
// Expect a semicolon or EOF at the end
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != SEMICOLON && tok != EOF {
return nil, newParseError(tokstr(tok, lit), []string{";", "EOF"}, pos)
}
return stmt, nil
}
// parseListSeriesStatement parses a string and returns a ListSeriesStatement.
// This function assumes the "LIST SERIES" tokens have already been consumed.
func (p *Parser) parseListSeriesStatement() (*ListSeriesStatement, error) {
stmt := &ListSeriesStatement{}
// Expect a semicolon or EOF at the end
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != SEMICOLON && tok != EOF {
return nil, newParseError(tokstr(tok, lit), []string{";", "EOF"}, pos)
}
return stmt, nil
}
// parseDropSeriesStatement parses a string and returns a DropSeriesStatement.
// This function assumes the "DROP SERIES" tokens have already been consumed.
func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) {
stmt := &DropSeriesStatement{}
// Read the name of the series to drop.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
}
stmt.Name = lit
// Expect a semicolon or EOF at the end
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != SEMICOLON && tok != EOF {
return nil, newParseError(tokstr(tok, lit), []string{";", "EOF"}, pos)
}
return stmt, nil
}
// parseListContinuousQueriesStatement parses a string and returns a ListContinuousQueriesStatement.
// This function assumes the "LIST CONTINUOUS" tokens have already been consumed.
func (p *Parser) parseListContinuousQueriesStatement() (*ListContinuousQueriesStatement, error) {
stmt := &ListContinuousQueriesStatement{}
// Expect a "QUERIES" token.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != QUERIES {
return nil, newParseError(tokstr(tok, lit), []string{"QUERIES"}, pos)
}
// Expect a semicolon or EOF at the end
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != SEMICOLON && tok != EOF {
return nil, newParseError(tokstr(tok, lit), []string{";", "EOF"}, pos)
}
return stmt, nil
}
// parseDropContinuousQueriesStatement parses a string and returns a DropContinuousQueryStatement.
// This function assumes the "DROP CONTINUOUS" tokens have already been consumed.
func (p *Parser) parseDropContinuousQueryStatement() (*DropContinuousQueryStatement, error) {
stmt := &DropContinuousQueryStatement{}
// Expect a "QUERY" token.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != QUERY {
return nil, newParseError(tokstr(tok, lit), []string{"QUERY"}, pos)
}
// Read the id of the query to drop.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != NUMBER {
return nil, newParseError(tokstr(tok, lit), []string{"integer"}, pos)
} else if strings.Contains(lit, ".") {
return nil, &ParseError{Message: "continuous query id must be an integer", Pos: pos}
}
stmt.ID, _ = strconv.Atoi(lit)
// Expect a semicolon or EOF at the end
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != SEMICOLON && tok != EOF {
return nil, newParseError(tokstr(tok, lit), []string{";", "EOF"}, pos)
}
return stmt, nil
}
@ -243,10 +391,7 @@ func (p *Parser) parseLimit() (int, error) {
}
// Parse number.
n, err := strconv.ParseInt(lit, 10, 64)
if err != nil {
return 0, &ParseError{Message: "unable to parse limit", Pos: pos}
}
n, _ := strconv.ParseInt(lit, 10, 64)
return int(n), nil
}
@ -286,7 +431,7 @@ func (p *Parser) ParseExpr() (Expr, error) {
for {
// If the next token is NOT an operator then return the expression.
op, _, _ := p.scanIgnoreWhitespace()
if !op.IsOperator() {
if !op.isOperator() {
p.unscan()
return expr, nil
}
@ -328,10 +473,7 @@ func (p *Parser) parseUnaryExpr() (Expr, error) {
case TRUE, FALSE:
return &BooleanLiteral{Val: (tok == TRUE)}, nil
case DURATION:
v, err := ParseDuration(lit)
if err != nil {
return nil, &ParseError{Message: err.Error(), Pos: pos}
}
v, _ := ParseDuration(lit)
return &DurationLiteral{Val: v}, nil
default:
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string", "number", "bool"}, pos)
@ -357,9 +499,6 @@ func (p *Parser) consumeWhitespace() {
}
}
// curr returns the last read token from the underlying scanner.
func (p *Parser) curr() (tok Token, pos Pos, lit string) { return p.s.curr() }
// unscan pushes the previously read token back onto the buffer.
func (p *Parser) unscan() { p.s.Unscan() }

View File

@ -9,6 +9,35 @@ import (
"github.com/influxdb/influxdb/influxql"
)
// Ensure the parser can parse a multi-statement query.
func TestParser_ParseQuery(t *testing.T) {
s := `SELECT a FROM b; SELECT c FROM d`
q, err := influxql.NewParser(strings.NewReader(s)).ParseQuery()
if err != nil {
t.Fatalf("unexpected error: %s", err)
} else if len(q.Statements) != 2 {
t.Fatalf("unexpected statement count: %d", len(q.Statements))
}
}
// Ensure the parser can parse an empty query.
func TestParser_ParseQuery_Empty(t *testing.T) {
q, err := influxql.NewParser(strings.NewReader(``)).ParseQuery()
if err != nil {
t.Fatalf("unexpected error: %s", err)
} else if len(q.Statements) != 0 {
t.Fatalf("unexpected statement count: %d", len(q.Statements))
}
}
// Ensure the parser can return an error from an malformed statement.
func TestParser_ParseQuery_ParseError(t *testing.T) {
_, err := influxql.NewParser(strings.NewReader(`SELECT`)).ParseQuery()
if err == nil || err.Error() != `found EOF, expected identifier, string, number, bool at line 1, char 8` {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure the parser can parse strings into Statement ASTs.
func TestParser_ParseStatement(t *testing.T) {
var tests = []struct {
@ -16,9 +45,9 @@ func TestParser_ParseStatement(t *testing.T) {
stmt influxql.Statement
err string
}{
// SELECT statements
// SELECT statement
{
s: `SELECT field1, field2 ,field3 AS field_x FROM myseries WHERE host = 'hosta.influxdb.org' GROUP BY 10h LIMIT 20 ORDER BY ASC`,
s: `SELECT field1, field2 ,field3 AS field_x FROM myseries WHERE host = 'hosta.influxdb.org' GROUP BY 10h LIMIT 20 ORDER BY ASC;`,
stmt: &influxql.SelectStatement{
Fields: influxql.Fields{
&influxql.Field{Expr: &influxql.VarRef{Val: "field1"}},
@ -39,9 +68,74 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// DELETE statement
{
s: `DELETE FROM myseries WHERE host = 'hosta.influxdb.org'`,
stmt: &influxql.DeleteStatement{
Source: &influxql.Series{Name: "myseries"},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "host"},
RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"},
},
},
},
// LIST SERIES statement
{
s: `LIST SERIES`,
stmt: &influxql.ListSeriesStatement{},
},
// DROP SERIES statement
{
s: `DROP SERIES myseries`,
stmt: &influxql.DropSeriesStatement{Name: "myseries"},
},
// LIST CONTINUOUS QUERIES statement
{
s: `LIST CONTINUOUS QUERIES`,
stmt: &influxql.ListContinuousQueriesStatement{},
},
// DROP CONTINUOUS QUERY statement
{
s: `DROP CONTINUOUS QUERY 12`,
stmt: &influxql.DropContinuousQueryStatement{ID: 12},
},
// Errors
{s: ``, err: `found EOF, expected SELECT at line 1, char 1`},
{s: `SELECT`, err: `found EOF, expected identifier, string, number, bool at line 1, char 8`},
{s: `SELECT field X`, err: `found X, expected FROM at line 1, char 14`},
{s: `SELECT field FROM "series" WHERE X Y`, err: `found Y, expected ;, EOF at line 1, char 36`},
{s: `SELECT field FROM "series" WHERE X +;`, err: `found ;, expected identifier, string, number, bool at line 1, char 37`},
{s: `SELECT field FROM myseries GROUP`, err: `found EOF, expected BY at line 1, char 34`},
{s: `SELECT field FROM myseries LIMIT`, err: `found EOF, expected number at line 1, char 34`},
{s: `SELECT field FROM myseries LIMIT 10.5`, err: `fractional parts not allowed in limit at line 1, char 34`},
{s: `SELECT field FROM myseries ORDER`, err: `found EOF, expected BY at line 1, char 34`},
{s: `SELECT field FROM myseries ORDER BY /`, err: `found /, expected ASC, DESC at line 1, char 37`},
{s: `SELECT field AS`, err: `found EOF, expected identifier, string at line 1, char 17`},
{s: `SELECT field FROM 12`, err: `found 12, expected identifier, string at line 1, char 19`},
{s: `SELECT field FROM myseries GROUP BY *`, err: `found *, expected identifier, string, number, bool at line 1, char 37`},
{s: `SELECT 1000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 FROM myseries`, err: `unable to parse number at line 1, char 8`},
{s: `SELECT 10.5h FROM myseries`, err: `found h, expected FROM at line 1, char 12`},
{s: `DELETE`, err: `found EOF, expected FROM at line 1, char 8`},
{s: `DELETE FROM`, err: `found EOF, expected identifier, string at line 1, char 13`},
{s: `DELETE FROM myseries WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},
{s: `DELETE FROM myseries 123`, err: `found 123, expected ;, EOF at line 1, char 22`},
{s: `LIST SERIES x`, err: `found x, expected ;, EOF at line 1, char 13`},
{s: `DROP SERIES`, err: `found EOF, expected identifier, string at line 1, char 13`},
{s: `DROP SERIES myseries X`, err: `found X, expected ;, EOF at line 1, char 22`},
{s: `LIST CONTINUOUS`, err: `found EOF, expected QUERIES at line 1, char 17`},
{s: `LIST CONTINUOUS QUERIES x`, err: `found x, expected ;, EOF at line 1, char 25`},
{s: `LIST FOO`, err: `found FOO, expected SERIES, CONTINUOUS at line 1, char 6`},
{s: `DROP CONTINUOUS`, err: `found EOF, expected QUERY at line 1, char 17`},
{s: `DROP CONTINUOUS QUERY`, err: `found EOF, expected integer at line 1, char 23`},
{s: `DROP CONTINUOUS QUERY 12.5`, err: `continuous query id must be an integer at line 1, char 23`},
{s: `DROP CONTINUOUS QUERY 12 X`, err: `found X, expected ;, EOF at line 1, char 26`},
{s: `DROP FOO`, err: `found FOO, expected SERIES, CONTINUOUS at line 1, char 6`},
}
for i, tt := range tests {

View File

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"os"
"strings"
)
// Scanner represents a lexical scanner for InfluxQL.
@ -217,20 +218,23 @@ func (s *Scanner) scanNumber() (tok Token, pos Pos, lit string) {
s.r.unread()
}
// If the next rune is a duration unit (u,µ,ms,s) then return a duration token
if ch0, _ := s.r.read(); ch0 == 'u' || ch0 == 'µ' || ch0 == 's' || ch0 == 'h' || ch0 == 'd' || ch0 == 'w' {
_, _ = buf.WriteRune(ch0)
return DURATION, pos, buf.String()
} else if ch0 == 'm' {
_, _ = buf.WriteRune(ch0)
if ch1, _ := s.r.read(); ch1 == 's' {
_, _ = buf.WriteRune(ch1)
} else {
s.r.unread()
// Attempt to read as a duration if it doesn't have a fractional part.
if !strings.Contains(buf.String(), ".") {
// If the next rune is a duration unit (u,µ,ms,s) then return a duration token
if ch0, _ := s.r.read(); ch0 == 'u' || ch0 == 'µ' || ch0 == 's' || ch0 == 'h' || ch0 == 'd' || ch0 == 'w' {
_, _ = buf.WriteRune(ch0)
return DURATION, pos, buf.String()
} else if ch0 == 'm' {
_, _ = buf.WriteRune(ch0)
if ch1, _ := s.r.read(); ch1 == 's' {
_, _ = buf.WriteRune(ch1)
} else {
s.r.unread()
}
return DURATION, pos, buf.String()
}
return DURATION, pos, buf.String()
s.r.unread()
}
s.r.unread()
return NUMBER, pos, buf.String()
}
@ -295,13 +299,6 @@ func (s *bufScanner) Scan() (tok Token, pos Pos, lit string) {
// Unscan pushes the previously token back onto the buffer.
func (s *bufScanner) Unscan() { s.n++ }
// Peek reads the next token from the scanner and immediately unscans it.
func (s *bufScanner) Peek() (tok Token, pos Pos, lit string) {
tok, pos, lit = s.Scan()
s.Unscan()
return
}
// curr returns the last read token.
func (s *bufScanner) curr() (tok Token, pos Pos, lit string) {
buf := &s.buf[(s.i-s.n+len(s.buf))%len(s.buf)]

View File

@ -82,12 +82,13 @@ func TestScanner_Scan(t *testing.T) {
{s: `.`, tok: influxql.ILLEGAL, lit: `.`},
{s: `-.`, tok: influxql.SUB, lit: ``},
{s: `+.`, tok: influxql.ADD, lit: ``},
{s: `10.3s`, tok: influxql.NUMBER, lit: `10.3`},
// Durations
{s: `10u`, tok: influxql.DURATION, lit: `10u`},
{s: `10µ`, tok: influxql.DURATION, lit: `10µ`},
{s: `10.3ms`, tok: influxql.DURATION, lit: `10.3ms`},
{s: `-.1s`, tok: influxql.DURATION, lit: `-.1s`},
{s: `10ms`, tok: influxql.DURATION, lit: `10ms`},
{s: `-1s`, tok: influxql.DURATION, lit: `-1s`},
{s: `10m`, tok: influxql.DURATION, lit: `10m`},
{s: `10h`, tok: influxql.DURATION, lit: `10h`},
{s: `10d`, tok: influxql.DURATION, lit: `10d`},

View File

@ -69,6 +69,7 @@ const (
MERGE
ORDER
QUERIES
QUERY
SELECT
SERIES
WHERE
@ -125,6 +126,7 @@ var tokens = [...]string{
MERGE: "MERGE",
ORDER: "ORDER",
QUERIES: "QUERIES",
QUERY: "QUERY",
SELECT: "SELECT",
SERIES: "SERIES",
WHERE: "WHERE",
@ -171,17 +173,8 @@ func (tok Token) Precedence() int {
return 0
}
// IsLiteral returns true for literal tokens.
func (tok Token) IsLiteral() bool { return tok > literal_beg && tok < literal_end }
// IsOperator returns true for operator tokens.
func (tok Token) IsOperator() bool { return tok > operator_beg && tok < operator_end }
// IsKeyword returns true for keyword tokens.
func (tok Token) IsKeyword() bool { return tok > keyword_beg && tok < keyword_end }
// MarshalJSON converts the token to JSON.
func (tok Token) MarshalJSON() ([]byte, error) { return []byte(`"` + tok.String() + `"`), nil }
// isOperator returns true for operator tokens.
func (tok Token) isOperator() bool { return tok > operator_beg && tok < operator_end }
// tokstr returns a literal if provided, otherwise returns the token string.
func tokstr(tok Token, lit string) string {