fix #2555: add backreference in CQs
Add new query syntax to allow the following in CQs: INTO "1hPolicy".:MEASUREMENTpull/3876/head
parent
c1d2f11367
commit
99a22c174b
|
@ -1508,6 +1508,9 @@ func (t *Target) String() string {
|
|||
var buf bytes.Buffer
|
||||
_, _ = buf.WriteString("INTO ")
|
||||
_, _ = buf.WriteString(t.Measurement.String())
|
||||
if t.Measurement.Name == "" {
|
||||
_, _ = buf.WriteString(":MEASUREMENT")
|
||||
}
|
||||
|
||||
return buf.String()
|
||||
}
|
||||
|
@ -2166,6 +2169,7 @@ type Measurement struct {
|
|||
RetentionPolicy string
|
||||
Name string
|
||||
Regex *RegexLiteral
|
||||
Parent Node
|
||||
}
|
||||
|
||||
// String returns a string representation of the measurement.
|
||||
|
|
|
@ -488,6 +488,9 @@ func (p *Parser) parseSegmentedIdents() ([]string, error) {
|
|||
if ch := p.peekRune(); ch == '/' {
|
||||
// Next segment is a regex so we're done.
|
||||
break
|
||||
} else if ch == ':' {
|
||||
// Next segment is context-specific so let caller handle it.
|
||||
break
|
||||
} else if ch == '.' {
|
||||
// Add an empty identifier.
|
||||
idents = append(idents, "")
|
||||
|
@ -716,7 +719,7 @@ func (p *Parser) parseSelectStatement(tr targetRequirement) (*SelectStatement, e
|
|||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != FROM {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"FROM"}, pos)
|
||||
}
|
||||
if stmt.Sources, err = p.parseSources(); err != nil {
|
||||
if stmt.Sources, err = p.parseSources(stmt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -799,7 +802,19 @@ func (p *Parser) parseTarget(tr targetRequirement) (*Target, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if len(idents) < 3 {
|
||||
// Check for source measurement reference.
|
||||
if ch := p.peekRune(); ch == ':' {
|
||||
if err := p.parseTokens([]Token{COLON, MEASUREMENT}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Append empty measurement name.
|
||||
idents = append(idents, "")
|
||||
}
|
||||
}
|
||||
|
||||
t := &Target{Measurement: &Measurement{}}
|
||||
t.Measurement.Parent = t
|
||||
|
||||
switch len(idents) {
|
||||
case 1:
|
||||
|
@ -825,7 +840,7 @@ func (p *Parser) parseDeleteStatement() (*DeleteStatement, error) {
|
|||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != FROM {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"FROM"}, pos)
|
||||
}
|
||||
source, err := p.parseSource()
|
||||
source, err := p.parseSource(stmt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -849,7 +864,7 @@ func (p *Parser) parseShowSeriesStatement() (*ShowSeriesStatement, error) {
|
|||
|
||||
// Parse optional FROM.
|
||||
if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM {
|
||||
if stmt.Sources, err = p.parseSources(); err != nil {
|
||||
if stmt.Sources, err = p.parseSources(stmt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
|
@ -936,7 +951,7 @@ func (p *Parser) parseShowTagKeysStatement() (*ShowTagKeysStatement, error) {
|
|||
|
||||
// Parse optional source.
|
||||
if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM {
|
||||
if stmt.Sources, err = p.parseSources(); err != nil {
|
||||
if stmt.Sources, err = p.parseSources(stmt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
|
@ -974,7 +989,7 @@ func (p *Parser) parseShowTagValuesStatement() (*ShowTagValuesStatement, error)
|
|||
|
||||
// Parse optional source.
|
||||
if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM {
|
||||
if stmt.Sources, err = p.parseSources(); err != nil {
|
||||
if stmt.Sources, err = p.parseSources(stmt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
|
@ -1064,7 +1079,7 @@ func (p *Parser) parseShowFieldKeysStatement() (*ShowFieldKeysStatement, error)
|
|||
|
||||
// Parse optional source.
|
||||
if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM {
|
||||
if stmt.Sources, err = p.parseSources(); err != nil {
|
||||
if stmt.Sources, err = p.parseSources(stmt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
|
@ -1114,7 +1129,7 @@ func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) {
|
|||
|
||||
if tok == FROM {
|
||||
// Parse source.
|
||||
if stmt.Sources, err = p.parseSources(); err != nil {
|
||||
if stmt.Sources, err = p.parseSources(stmt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
|
@ -1549,11 +1564,11 @@ func (p *Parser) parseAlias() (string, error) {
|
|||
}
|
||||
|
||||
// parseSources parses a comma delimited list of sources.
|
||||
func (p *Parser) parseSources() (Sources, error) {
|
||||
func (p *Parser) parseSources(parent Node) (Sources, error) {
|
||||
var sources Sources
|
||||
|
||||
for {
|
||||
s, err := p.parseSource()
|
||||
s, err := p.parseSource(parent)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1578,8 +1593,8 @@ func (p *Parser) peekRune() rune {
|
|||
return r
|
||||
}
|
||||
|
||||
func (p *Parser) parseSource() (Source, error) {
|
||||
m := &Measurement{}
|
||||
func (p *Parser) parseSource(parent Node) (Source, error) {
|
||||
m := &Measurement{Parent: parent}
|
||||
|
||||
// Attempt to parse a regex.
|
||||
re, err := p.parseRegex()
|
||||
|
|
|
@ -921,6 +921,32 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
},
|
||||
},
|
||||
|
||||
// CREATE CONTINUOUS QUERY with backreference measurement name
|
||||
{
|
||||
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT mean(value) INTO "policy1".:measurement FROM /^[a-z]+.*/ GROUP BY time(1m) END`,
|
||||
stmt: &influxql.CreateContinuousQueryStatement{
|
||||
Name: "myquery",
|
||||
Database: "testdb",
|
||||
Source: &influxql.SelectStatement{
|
||||
Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "mean", Args: []influxql.Expr{&influxql.VarRef{Val: "value"}}}}},
|
||||
Target: &influxql.Target{
|
||||
Measurement: &influxql.Measurement{RetentionPolicy: "policy1"},
|
||||
},
|
||||
Sources: []influxql.Source{&influxql.Measurement{Regex: &influxql.RegexLiteral{Val: regexp.MustCompile(`^[a-z]+.*`)}}},
|
||||
Dimensions: []*influxql.Dimension{
|
||||
{
|
||||
Expr: &influxql.Call{
|
||||
Name: "time",
|
||||
Args: []influxql.Expr{
|
||||
&influxql.DurationLiteral{Val: 1 * time.Minute},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
// CREATE DATABASE statement
|
||||
{
|
||||
s: `CREATE DATABASE testdb`,
|
||||
|
|
|
@ -95,6 +95,8 @@ func (s *Scanner) Scan() (tok Token, pos Pos, lit string) {
|
|||
return COMMA, pos, ""
|
||||
case ';':
|
||||
return SEMICOLON, pos, ""
|
||||
case ':':
|
||||
return COLON, pos, ""
|
||||
}
|
||||
|
||||
return ILLEGAL, pos, string(ch0)
|
||||
|
|
|
@ -50,6 +50,7 @@ const (
|
|||
LPAREN // (
|
||||
RPAREN // )
|
||||
COMMA // ,
|
||||
COLON // :
|
||||
SEMICOLON // ;
|
||||
DOT // .
|
||||
|
||||
|
@ -160,6 +161,7 @@ var tokens = [...]string{
|
|||
LPAREN: "(",
|
||||
RPAREN: ")",
|
||||
COMMA: ",",
|
||||
COLON: ":",
|
||||
SEMICOLON: ";",
|
||||
DOT: ".",
|
||||
|
||||
|
|
|
@ -302,8 +302,13 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
|||
}
|
||||
|
||||
for _, row := range result.Series {
|
||||
// Get the measurement name for the result.
|
||||
measurement := cq.intoMeasurement()
|
||||
if measurement == "" {
|
||||
measurement = row.Name
|
||||
}
|
||||
// Convert the result row to points.
|
||||
part, err := s.convertRowToPoints(cq.intoMeasurement(), row)
|
||||
part, err := s.convertRowToPoints(measurement, row)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
|
@ -346,7 +351,11 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
|||
}
|
||||
|
||||
if s.loggingEnabled {
|
||||
s.Logger.Printf("wrote %d point(s) to %s.%s.%s", len(points), cq.intoDB(), cq.intoRP(), cq.Info.Name)
|
||||
db := cq.intoDB()
|
||||
if db == "" {
|
||||
db = cq.Database
|
||||
}
|
||||
s.Logger.Printf("wrote %d point(s) to %s.%s", len(points), db, cq.intoRP())
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -61,6 +62,45 @@ func TestExecuteContinuousQuery_HappyPath(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Test ExecuteContinuousQuery when INTO measurements are taken from the FROM clause.
|
||||
func TestExecuteContinuousQuery_ReferenceSource(t *testing.T) {
|
||||
s := NewTestService(t)
|
||||
dbis, _ := s.MetaStore.Databases()
|
||||
dbi := dbis[2]
|
||||
cqi := dbi.ContinuousQueries[0]
|
||||
|
||||
rowCnt := 2
|
||||
pointCnt := 1
|
||||
qe := s.QueryExecutor.(*QueryExecutor)
|
||||
qe.Results = []*influxql.Result{genResult(rowCnt, pointCnt)}
|
||||
|
||||
pw := s.PointsWriter.(*PointsWriter)
|
||||
pw.WritePointsFn = func(p *cluster.WritePointsRequest) error {
|
||||
if len(p.Points) != pointCnt*rowCnt {
|
||||
return fmt.Errorf("exp = %d, got = %d", pointCnt, len(p.Points))
|
||||
}
|
||||
|
||||
exp := "cpu,host=server01 value=0"
|
||||
got := p.Points[0].String()
|
||||
if !strings.Contains(got, exp) {
|
||||
return fmt.Errorf("\n\tExpected ':MEASUREMENT' to be expanded to the measurement name(s) in the FROM regexp.\n\tqry = %s\n\texp = %s\n\tgot = %s\n", cqi.Query, got, exp)
|
||||
}
|
||||
|
||||
exp = "cpu2,host=server01 value=0"
|
||||
got = p.Points[1].String()
|
||||
if !strings.Contains(got, exp) {
|
||||
return fmt.Errorf("\n\tExpected ':MEASUREMENT' to be expanded to the measurement name(s) in the FROM regexp.\n\tqry = %s\n\texp = %s\n\tgot = %s\n", cqi.Query, got, exp)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
err := s.ExecuteContinuousQuery(&dbi, &cqi)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Test the service happy path.
|
||||
func TestService_HappyPath(t *testing.T) {
|
||||
s := NewTestService(t)
|
||||
|
@ -70,7 +110,7 @@ func TestService_HappyPath(t *testing.T) {
|
|||
qe.Results = []*influxql.Result{genResult(1, pointCnt)}
|
||||
|
||||
pw := s.PointsWriter.(*PointsWriter)
|
||||
ch := make(chan int, 5)
|
||||
ch := make(chan int, 10)
|
||||
defer close(ch)
|
||||
pw.WritePointsFn = func(p *cluster.WritePointsRequest) error {
|
||||
ch <- len(p.Points)
|
||||
|
@ -97,7 +137,7 @@ func TestService_Run(t *testing.T) {
|
|||
s.Config.RecomputePreviousN = 0
|
||||
|
||||
done := make(chan struct{})
|
||||
expectCallCnt := 2
|
||||
expectCallCnt := 3
|
||||
callCnt := 0
|
||||
|
||||
// Set a callback for ExecuteQuery.
|
||||
|
@ -252,6 +292,8 @@ func NewTestService(t *testing.T) *Service {
|
|||
ms.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1h GROUP BY time(1s) END`)
|
||||
ms.CreateDatabase("db2", "default")
|
||||
ms.CreateContinuousQuery("db2", "cq2", `CREATE CONTINUOUS QUERY cq2 ON db2 BEGIN SELECT mean(value) INTO cpu_mean FROM cpu WHERE time > now() - 10m GROUP BY time(1m) END`)
|
||||
ms.CreateDatabase("db3", "default")
|
||||
ms.CreateContinuousQuery("db3", "cq3", `CREATE CONTINUOUS QUERY cq3 ON db3 BEGIN SELECT mean(value) INTO "1hAverages".:MEASUREMENT FROM /cpu[0-9]?/ GROUP BY time(10s) END`)
|
||||
|
||||
return s
|
||||
}
|
||||
|
@ -471,6 +513,9 @@ func genResult(rowCnt, valCnt int) *influxql.Result {
|
|||
Columns: []string{"time", "value"},
|
||||
Values: vals,
|
||||
}
|
||||
if len(rows) > 0 {
|
||||
row.Name = fmt.Sprintf("cpu%d", len(rows)+1)
|
||||
}
|
||||
rows = append(rows, row)
|
||||
}
|
||||
return &influxql.Result{
|
||||
|
|
|
@ -868,8 +868,12 @@ func (q *QueryExecutor) normalizeStatement(stmt influxql.Statement, defaultDatab
|
|||
// normalizeMeasurement inserts the default database or policy into all measurement names,
|
||||
// if required.
|
||||
func (q *QueryExecutor) normalizeMeasurement(m *influxql.Measurement, defaultDatabase string) error {
|
||||
if m.Name == "" && m.Regex == nil {
|
||||
return errors.New("invalid measurement")
|
||||
// Targets (measurements in an INTO clause) can have blank names, which means it will be
|
||||
// the same as the measurement name it came from in the FROM clause.
|
||||
if _, ok := m.Parent.(*influxql.Target); !ok {
|
||||
if m.Name == "" && m.Regex == nil {
|
||||
return errors.New("invalid measurement")
|
||||
}
|
||||
}
|
||||
|
||||
// Measurement does not have an explicit database? Insert default.
|
||||
|
|
Loading…
Reference in New Issue