Merge pull request #3876 from influxdb/dgn-cq-backrefs

fix #2555: add backreference in CQs
pull/3975/head
dgnorton 2015-09-03 13:33:35 -04:00
commit 1c29e59082
12 changed files with 314 additions and 71 deletions

View File

@ -8,6 +8,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#3892](https://github.com/influxdb/influxdb/pull/3892): Support IF NOT EXISTS for CREATE DATABASE - [#3892](https://github.com/influxdb/influxdb/pull/3892): Support IF NOT EXISTS for CREATE DATABASE
- [#3916](https://github.com/influxdb/influxdb/pull/3916): New statistics and diagnostics support. Graphite first to be instrumented. - [#3916](https://github.com/influxdb/influxdb/pull/3916): New statistics and diagnostics support. Graphite first to be instrumented.
- [#3901](https://github.com/influxdb/influxdb/pull/3901): Add consistency level option to influx cli Thanks @takayuki - [#3901](https://github.com/influxdb/influxdb/pull/3901): Add consistency level option to influx cli Thanks @takayuki
- [#3876](https://github.com/influxdb/influxdb/pull/3876): Allow the following syntax in CQs: INTO "1hPolicy".:MEASUREMENT
### Bugfixes ### Bugfixes
- [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803. - [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803.

View File

@ -2,6 +2,7 @@
package run_test package run_test
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -105,10 +106,14 @@ func (s *Server) QueryWithParams(query string, values url.Values) (results strin
values = url.Values{} values = url.Values{}
} }
values.Set("q", query) values.Set("q", query)
resp, err := http.Get(s.URL() + "/query?" + values.Encode()) return s.HTTPGet(s.URL() + "/query?" + values.Encode())
}
// HTTPGet makes an HTTP GET request to the server and returns the response.
func (s *Server) HTTPGet(url string) (results string, err error) {
resp, err := http.Get(url)
if err != nil { if err != nil {
return "", err return "", err
//} else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusBadRequest {
} }
body := string(MustReadAll(resp.Body)) body := string(MustReadAll(resp.Body))
switch resp.StatusCode { switch resp.StatusCode {
@ -124,6 +129,27 @@ func (s *Server) QueryWithParams(query string, values url.Values) (results strin
} }
} }
// HTTPPost makes an HTTP POST request to the server and returns the response.
func (s *Server) HTTPPost(url string, content []byte) (results string, err error) {
buf := bytes.NewBuffer(content)
resp, err := http.Post(url, "application/json", buf)
if err != nil {
return "", err
}
body := string(MustReadAll(resp.Body))
switch resp.StatusCode {
case http.StatusBadRequest:
if !expectPattern(".*error parsing query*.", body) {
return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body)
}
return body, nil
case http.StatusOK, http.StatusNoContent:
return body, nil
default:
return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body)
}
}
// Write executes a write against the server and returns the results. // Write executes a write against the server and returns the results.
func (s *Server) Write(db, rp, body string, params url.Values) (results string, err error) { func (s *Server) Write(db, rp, body string, params url.Values) (results string, err error) {
if params == nil { if params == nil {

View File

@ -3631,7 +3631,7 @@ func TestServer_Query_ShowFieldKeys(t *testing.T) {
} }
} }
func TestServer_Query_CreateContinuousQuery(t *testing.T) { func TestServer_ContinuousQuery(t *testing.T) {
t.Parallel() t.Parallel()
s := OpenServer(NewConfig(), "") s := OpenServer(NewConfig(), "")
defer s.Close() defer s.Close()
@ -3643,37 +3643,112 @@ func TestServer_Query_CreateContinuousQuery(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
test := NewTest("db0", "rp0") runTest := func(test *Test, t *testing.T) {
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
// Start times of CQ intervals.
interval0 := time.Now().Add(-time.Second).Round(time.Second * 5)
interval1 := interval0.Add(-time.Second * 5)
interval2 := interval0.Add(-time.Second * 10)
interval3 := interval0.Add(-time.Second * 15)
writes := []string{
// Point too far in the past for CQ to pick up.
fmt.Sprintf(`cpu,host=server01,region=uswest value=100 %d`, interval3.Add(time.Second).UnixNano()),
// Points two intervals ago.
fmt.Sprintf(`cpu,host=server01 value=100 %d`, interval2.Add(time.Second).UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=uswest value=100 %d`, interval2.Add(time.Second*2).UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=useast value=100 %d`, interval2.Add(time.Second*3).UnixNano()),
// Points one interval ago.
fmt.Sprintf(`gpu,host=server02,region=useast value=100 %d`, interval1.Add(time.Second).UnixNano()),
fmt.Sprintf(`gpu,host=server03,region=caeast value=100 %d`, interval1.Add(time.Second*2).UnixNano()),
// Points in the current interval.
fmt.Sprintf(`gpu,host=server03,region=caeast value=100 %d`, interval0.Add(time.Second).UnixNano()),
fmt.Sprintf(`disk,host=server03,region=caeast value=100 %d`, interval0.Add(time.Second*2).UnixNano()),
}
test := NewTest("db0", "rp0")
test.write = strings.Join(writes, "\n")
test.addQueries([]*Query{ test.addQueries([]*Query{
&Query{ &Query{
name: "create continuous query", name: `create another retention policy for CQ to write into`,
command: `CREATE CONTINUOUS QUERY "my.query" ON db0 BEGIN SELECT count(value) INTO measure1 FROM myseries GROUP BY time(10m) END`, command: `CREATE RETENTION POLICY rp1 ON db0 DURATION 1h REPLICATION 1`,
exp: `{"results":[{}]}`,
},
&Query{
name: "create continuous query with backreference",
command: `CREATE CONTINUOUS QUERY "cq1" ON db0 BEGIN SELECT count(value) INTO "rp1".:MEASUREMENT FROM /[cg]pu/ GROUP BY time(5s) END`,
exp: `{"results":[{}]}`,
},
&Query{
name: `create another retention policy for CQ to write into`,
command: `CREATE RETENTION POLICY rp2 ON db0 DURATION 1h REPLICATION 1`,
exp: `{"results":[{}]}`,
},
&Query{
name: "create continuous query with backreference and group by time",
command: `CREATE CONTINUOUS QUERY "cq2" ON db0 BEGIN SELECT count(value) INTO "rp2".:MEASUREMENT FROM /[cg]pu/ GROUP BY time(5s), * END`,
exp: `{"results":[{}]}`, exp: `{"results":[{}]}`,
}, },
&Query{ &Query{
name: `show continuous queries`, name: `show continuous queries`,
command: `SHOW CONTINUOUS QUERIES`, command: `SHOW CONTINUOUS QUERIES`,
exp: `{"results":[{"series":[{"name":"db0","columns":["name","query"],"values":[["my.query","CREATE CONTINUOUS QUERY \"my.query\" ON db0 BEGIN SELECT count(value) INTO \"db0\".\"rp0\".measure1 FROM \"db0\".\"rp0\".myseries GROUP BY time(10m) END"]]}]}]}`, exp: `{"results":[{"series":[{"name":"db0","columns":["name","query"],"values":[["cq1","CREATE CONTINUOUS QUERY cq1 ON db0 BEGIN SELECT count(value) INTO \"db0\".\"rp1\".:MEASUREMENT FROM \"db0\".\"rp0\"./[cg]pu/ GROUP BY time(5s) END"],["cq2","CREATE CONTINUOUS QUERY cq2 ON db0 BEGIN SELECT count(value) INTO \"db0\".\"rp2\".:MEASUREMENT FROM \"db0\".\"rp0\"./[cg]pu/ GROUP BY time(5s), * END"]]}]}]}`,
}, },
}...) }...)
for i, query := range test.queries { // Run first test to create CQs.
if i == 0 { runTest(&test, t)
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err) // Trigger CQs to run.
} u := fmt.Sprintf("%s/data/process_continuous_queries?time=%d", s.URL(), interval0.UnixNano())
} if _, err := s.HTTPPost(u, nil); err != nil {
if query.skip { t.Fatal(err)
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
} }
// Wait for CQs to run. TODO: fix this ugly hack
time.Sleep(time.Second * 2)
// Setup tests to check the CQ results.
test2 := NewTest("db0", "rp1")
test2.addQueries([]*Query{
&Query{
name: "check results of cq1",
command: `SELECT * FROM "rp1"./[cg]pu/`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","count","host","region","value"],"values":[["` + interval2.UTC().Format(time.RFC3339Nano) + `",3,null,null,null]]},{"name":"gpu","columns":["time","count","host","region","value"],"values":[["` + interval1.UTC().Format(time.RFC3339Nano) + `",2,null,null,null],["` + interval0.UTC().Format(time.RFC3339Nano) + `",1,null,null,null]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
// TODO: restore this test once this is fixed: https://github.com/influxdb/influxdb/issues/3968
&Query{
skip: true,
name: "check results of cq2",
command: `SELECT * FROM "rp2"./[cg]pu/`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","count","host","region","value"],"values":[["` + interval2.UTC().Format(time.RFC3339Nano) + `",1,"server01","uswest",null],["` + interval2.UTC().Format(time.RFC3339Nano) + `",1,"server01","",null],["` + interval2.UTC().Format(time.RFC3339Nano) + `",1,"server01","useast",null]]},{"name":"gpu","columns":["time","count","host","region","value"],"values":[["` + interval1.UTC().Format(time.RFC3339Nano) + `",1,"server02","useast",null],["` + interval1.UTC().Format(time.RFC3339Nano) + `",1,"server03","caeast",null],["` + interval0.UTC().Format(time.RFC3339Nano) + `",1,"server03","caeast",null]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
}...)
// Run second test to check CQ results.
runTest(&test2, t)
} }
// Tests that a known CQ query with concurrent writes does not deadlock the server // Tests that a known CQ query with concurrent writes does not deadlock the server

View File

@ -1508,6 +1508,9 @@ func (t *Target) String() string {
var buf bytes.Buffer var buf bytes.Buffer
_, _ = buf.WriteString("INTO ") _, _ = buf.WriteString("INTO ")
_, _ = buf.WriteString(t.Measurement.String()) _, _ = buf.WriteString(t.Measurement.String())
if t.Measurement.Name == "" {
_, _ = buf.WriteString(":MEASUREMENT")
}
return buf.String() return buf.String()
} }
@ -2166,6 +2169,7 @@ type Measurement struct {
RetentionPolicy string RetentionPolicy string
Name string Name string
Regex *RegexLiteral Regex *RegexLiteral
IsTarget bool
} }
// String returns a string representation of the measurement. // String returns a string representation of the measurement.

View File

@ -488,6 +488,9 @@ func (p *Parser) parseSegmentedIdents() ([]string, error) {
if ch := p.peekRune(); ch == '/' { if ch := p.peekRune(); ch == '/' {
// Next segment is a regex so we're done. // Next segment is a regex so we're done.
break break
} else if ch == ':' {
// Next segment is context-specific so let caller handle it.
break
} else if ch == '.' { } else if ch == '.' {
// Add an empty identifier. // Add an empty identifier.
idents = append(idents, "") idents = append(idents, "")
@ -799,7 +802,18 @@ func (p *Parser) parseTarget(tr targetRequirement) (*Target, error) {
return nil, err return nil, err
} }
t := &Target{Measurement: &Measurement{}} if len(idents) < 3 {
// Check for source measurement reference.
if ch := p.peekRune(); ch == ':' {
if err := p.parseTokens([]Token{COLON, MEASUREMENT}); err != nil {
return nil, err
}
// Append empty measurement name.
idents = append(idents, "")
}
}
t := &Target{Measurement: &Measurement{IsTarget: true}}
switch len(idents) { switch len(idents) {
case 1: case 1:

View File

@ -830,7 +830,7 @@ func TestParser_ParseStatement(t *testing.T) {
Database: "testdb", Database: "testdb",
Source: &influxql.SelectStatement{ Source: &influxql.SelectStatement{
Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "count", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}}}}, Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "count", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}}}},
Target: &influxql.Target{Measurement: &influxql.Measurement{Name: "measure1"}}, Target: &influxql.Target{Measurement: &influxql.Measurement{Name: "measure1", IsTarget: true}},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
Dimensions: []*influxql.Dimension{ Dimensions: []*influxql.Dimension{
{ {
@ -854,7 +854,7 @@ func TestParser_ParseStatement(t *testing.T) {
Source: &influxql.SelectStatement{ Source: &influxql.SelectStatement{
IsRawQuery: true, IsRawQuery: true,
Fields: []*influxql.Field{{Expr: &influxql.Wildcard{}}}, Fields: []*influxql.Field{{Expr: &influxql.Wildcard{}}},
Target: &influxql.Target{Measurement: &influxql.Measurement{Name: "measure1"}}, Target: &influxql.Target{Measurement: &influxql.Measurement{Name: "measure1", IsTarget: true}},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu_load_short"}}, Sources: []influxql.Source{&influxql.Measurement{Name: "cpu_load_short"}},
}, },
}, },
@ -869,7 +869,7 @@ func TestParser_ParseStatement(t *testing.T) {
Source: &influxql.SelectStatement{ Source: &influxql.SelectStatement{
Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "count", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}}}}, Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "count", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}}}},
Target: &influxql.Target{ Target: &influxql.Target{
Measurement: &influxql.Measurement{RetentionPolicy: "1h.policy1", Name: "cpu.load"}, Measurement: &influxql.Measurement{RetentionPolicy: "1h.policy1", Name: "cpu.load", IsTarget: true},
}, },
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
Dimensions: []*influxql.Dimension{ Dimensions: []*influxql.Dimension{
@ -896,7 +896,7 @@ func TestParser_ParseStatement(t *testing.T) {
IsRawQuery: true, IsRawQuery: true,
Fields: []*influxql.Field{{Expr: &influxql.VarRef{Val: "value"}}}, Fields: []*influxql.Field{{Expr: &influxql.VarRef{Val: "value"}}},
Target: &influxql.Target{ Target: &influxql.Target{
Measurement: &influxql.Measurement{RetentionPolicy: "policy1", Name: "value"}, Measurement: &influxql.Measurement{RetentionPolicy: "policy1", Name: "value", IsTarget: true},
}, },
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
}, },
@ -914,13 +914,39 @@ func TestParser_ParseStatement(t *testing.T) {
Fields: []*influxql.Field{{Expr: &influxql.VarRef{Val: "transmit_rx"}}, Fields: []*influxql.Field{{Expr: &influxql.VarRef{Val: "transmit_rx"}},
{Expr: &influxql.VarRef{Val: "transmit_tx"}}}, {Expr: &influxql.VarRef{Val: "transmit_tx"}}},
Target: &influxql.Target{ Target: &influxql.Target{
Measurement: &influxql.Measurement{RetentionPolicy: "policy1", Name: "network"}, Measurement: &influxql.Measurement{RetentionPolicy: "policy1", Name: "network", IsTarget: true},
}, },
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
}, },
}, },
}, },
// CREATE CONTINUOUS QUERY with backreference measurement name
{
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT mean(value) INTO "policy1".:measurement FROM /^[a-z]+.*/ GROUP BY time(1m) END`,
stmt: &influxql.CreateContinuousQueryStatement{
Name: "myquery",
Database: "testdb",
Source: &influxql.SelectStatement{
Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "mean", Args: []influxql.Expr{&influxql.VarRef{Val: "value"}}}}},
Target: &influxql.Target{
Measurement: &influxql.Measurement{RetentionPolicy: "policy1", IsTarget: true},
},
Sources: []influxql.Source{&influxql.Measurement{Regex: &influxql.RegexLiteral{Val: regexp.MustCompile(`^[a-z]+.*`)}}},
Dimensions: []*influxql.Dimension{
{
Expr: &influxql.Call{
Name: "time",
Args: []influxql.Expr{
&influxql.DurationLiteral{Val: 1 * time.Minute},
},
},
},
},
},
},
},
// CREATE DATABASE statement // CREATE DATABASE statement
{ {
s: `CREATE DATABASE testdb`, s: `CREATE DATABASE testdb`,

View File

@ -95,6 +95,8 @@ func (s *Scanner) Scan() (tok Token, pos Pos, lit string) {
return COMMA, pos, "" return COMMA, pos, ""
case ';': case ';':
return SEMICOLON, pos, "" return SEMICOLON, pos, ""
case ':':
return COLON, pos, ""
} }
return ILLEGAL, pos, string(ch0) return ILLEGAL, pos, string(ch0)

View File

@ -50,6 +50,7 @@ const (
LPAREN // ( LPAREN // (
RPAREN // ) RPAREN // )
COMMA // , COMMA // ,
COLON // :
SEMICOLON // ; SEMICOLON // ;
DOT // . DOT // .
@ -160,6 +161,7 @@ var tokens = [...]string{
LPAREN: "(", LPAREN: "(",
RPAREN: ")", RPAREN: ")",
COMMA: ",", COMMA: ",",
COLON: ":",
SEMICOLON: ";", SEMICOLON: ";",
DOT: ".", DOT: ".",

View File

@ -23,7 +23,7 @@ const (
// ContinuousQuerier represents a service that executes continuous queries. // ContinuousQuerier represents a service that executes continuous queries.
type ContinuousQuerier interface { type ContinuousQuerier interface {
// Run executes the named query in the named database. Blank database or name matches all. // Run executes the named query in the named database. Blank database or name matches all.
Run(database, name string) error Run(database, name string, t time.Time) error
} }
// queryExecutor is an internal interface to make testing easier. // queryExecutor is an internal interface to make testing easier.
@ -43,6 +43,28 @@ type pointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error WritePoints(p *cluster.WritePointsRequest) error
} }
// RunRequest is a request to run one or more CQs.
type RunRequest struct {
// Now tells the CQ serivce what the current time is.
Now time.Time
// CQs tells the CQ service which queries to run.
// If nil, all queries will be run.
CQs []string
}
// matches returns true if the CQ matches one of the requested CQs.
func (rr *RunRequest) matches(cq *meta.ContinuousQueryInfo) bool {
if rr.CQs == nil {
return true
}
for _, q := range rr.CQs {
if q == cq.Name {
return true
}
}
return false
}
// Service manages continuous query execution. // Service manages continuous query execution.
type Service struct { type Service struct {
MetaStore metaStore MetaStore metaStore
@ -51,7 +73,7 @@ type Service struct {
Config *Config Config *Config
RunInterval time.Duration RunInterval time.Duration
// RunCh can be used by clients to signal service to run CQs. // RunCh can be used by clients to signal service to run CQs.
RunCh chan struct{} RunCh chan *RunRequest
Logger *log.Logger Logger *log.Logger
loggingEnabled bool loggingEnabled bool
// lastRuns maps CQ name to last time it was run. // lastRuns maps CQ name to last time it was run.
@ -65,7 +87,7 @@ func NewService(c Config) *Service {
s := &Service{ s := &Service{
Config: &c, Config: &c,
RunInterval: time.Second, RunInterval: time.Second,
RunCh: make(chan struct{}), RunCh: make(chan *RunRequest),
loggingEnabled: c.LogEnabled, loggingEnabled: c.LogEnabled,
Logger: log.New(os.Stderr, "[continuous_querier] ", log.LstdFlags), Logger: log.New(os.Stderr, "[continuous_querier] ", log.LstdFlags),
lastRuns: map[string]time.Time{}, lastRuns: map[string]time.Time{},
@ -112,7 +134,7 @@ func (s *Service) SetLogger(l *log.Logger) {
} }
// Run runs the specified continuous query, or all CQs if none is specified. // Run runs the specified continuous query, or all CQs if none is specified.
func (s *Service) Run(database, name string) error { func (s *Service) Run(database, name string, t time.Time) error {
var dbs []meta.DatabaseInfo var dbs []meta.DatabaseInfo
if database != "" { if database != "" {
@ -145,7 +167,7 @@ func (s *Service) Run(database, name string) error {
} }
// Signal the background routine to run CQs. // Signal the background routine to run CQs.
s.RunCh <- struct{}{} s.RunCh <- &RunRequest{Now: t}
return nil return nil
} }
@ -158,21 +180,21 @@ func (s *Service) backgroundLoop() {
case <-s.stop: case <-s.stop:
s.Logger.Println("continuous query service terminating") s.Logger.Println("continuous query service terminating")
return return
case <-s.RunCh: case req := <-s.RunCh:
if s.MetaStore.IsLeader() { if s.MetaStore.IsLeader() {
s.Logger.Print("running continuous queries by request") s.Logger.Printf("running continuous queries by request for time: %v", req.Now.UnixNano())
s.runContinuousQueries() s.runContinuousQueries(req)
} }
case <-time.After(s.RunInterval): case <-time.After(s.RunInterval):
if s.MetaStore.IsLeader() { if s.MetaStore.IsLeader() {
s.runContinuousQueries() s.runContinuousQueries(&RunRequest{Now: time.Now()})
} }
} }
} }
} }
// runContinuousQueries gets CQs from the meta store and runs them. // runContinuousQueries gets CQs from the meta store and runs them.
func (s *Service) runContinuousQueries() { func (s *Service) runContinuousQueries(req *RunRequest) {
// Get list of all databases. // Get list of all databases.
dbs, err := s.MetaStore.Databases() dbs, err := s.MetaStore.Databases()
if err != nil { if err != nil {
@ -183,7 +205,10 @@ func (s *Service) runContinuousQueries() {
for _, db := range dbs { for _, db := range dbs {
// TODO: distribute across nodes // TODO: distribute across nodes
for _, cq := range db.ContinuousQueries { for _, cq := range db.ContinuousQueries {
if err := s.ExecuteContinuousQuery(&db, &cq); err != nil { if !req.matches(&cq) {
continue
}
if err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil {
s.Logger.Printf("error executing query: %s: err = %s", cq.Query, err) s.Logger.Printf("error executing query: %s: err = %s", cq.Query, err)
} }
} }
@ -191,7 +216,7 @@ func (s *Service) runContinuousQueries() {
} }
// ExecuteContinuousQuery executes a single CQ. // ExecuteContinuousQuery executes a single CQ.
func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo) error { func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) error {
// TODO: re-enable stats // TODO: re-enable stats
//s.stats.Inc("continuousQueryExecuted") //s.stats.Inc("continuousQueryExecuted")
@ -219,9 +244,9 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
} }
// We're about to run the query so store the time. // We're about to run the query so store the time.
now := time.Now() lastRun := time.Now()
cq.LastRun = now cq.LastRun = lastRun
s.lastRuns[cqi.Name] = now s.lastRuns[cqi.Name] = lastRun
// Get the group by interval. // Get the group by interval.
interval, err := cq.q.GroupByInterval() interval, err := cq.q.GroupByInterval()
@ -288,12 +313,6 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
return err return err
} }
// Drain results
defer func() {
for _ = range ch {
}
}()
// Read all rows from the result channel. // Read all rows from the result channel.
points := make([]tsdb.Point, 0, 100) points := make([]tsdb.Point, 0, 100)
for result := range ch { for result := range ch {
@ -302,8 +321,13 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
} }
for _, row := range result.Series { for _, row := range result.Series {
// Get the measurement name for the result.
measurement := cq.intoMeasurement()
if measurement == "" {
measurement = row.Name
}
// Convert the result row to points. // Convert the result row to points.
part, err := s.convertRowToPoints(cq.intoMeasurement(), row) part, err := s.convertRowToPoints(measurement, row)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
continue continue
@ -346,7 +370,7 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
} }
if s.loggingEnabled { if s.loggingEnabled {
s.Logger.Printf("wrote %d point(s) to %s.%s.%s", len(points), cq.intoDB(), cq.intoRP(), cq.Info.Name) s.Logger.Printf("wrote %d point(s) to %s.%s", len(points), cq.intoDB(), cq.intoRP())
} }
return nil return nil
@ -393,7 +417,13 @@ type ContinuousQuery struct {
q *influxql.SelectStatement q *influxql.SelectStatement
} }
func (cq *ContinuousQuery) intoDB() string { return cq.q.Target.Measurement.Database } func (cq *ContinuousQuery) intoDB() string {
if cq.q.Target.Measurement.Database != "" {
return cq.q.Target.Measurement.Database
}
return cq.Database
}
func (cq *ContinuousQuery) intoRP() string { return cq.q.Target.Measurement.RetentionPolicy } func (cq *ContinuousQuery) intoRP() string { return cq.q.Target.Measurement.RetentionPolicy }
func (cq *ContinuousQuery) setIntoRP(rp string) { cq.q.Target.Measurement.RetentionPolicy = rp } func (cq *ContinuousQuery) setIntoRP(rp string) { cq.q.Target.Measurement.RetentionPolicy = rp }
func (cq *ContinuousQuery) intoMeasurement() string { return cq.q.Target.Measurement.Name } func (cq *ContinuousQuery) intoMeasurement() string { return cq.q.Target.Measurement.Name }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"strings"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -36,8 +37,8 @@ func TestOpenAndClose(t *testing.T) {
} }
} }
// Test ExecuteContinuousQuery happy path. // Test ExecuteContinuousQuery.
func TestExecuteContinuousQuery_HappyPath(t *testing.T) { func TestExecuteContinuousQuery(t *testing.T) {
s := NewTestService(t) s := NewTestService(t)
dbis, _ := s.MetaStore.Databases() dbis, _ := s.MetaStore.Databases()
dbi := dbis[0] dbi := dbis[0]
@ -55,14 +56,53 @@ func TestExecuteContinuousQuery_HappyPath(t *testing.T) {
return nil return nil
} }
err := s.ExecuteContinuousQuery(&dbi, &cqi) err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
if err != nil {
t.Error(err)
}
}
// Test ExecuteContinuousQuery when INTO measurements are taken from the FROM clause.
func TestExecuteContinuousQuery_ReferenceSource(t *testing.T) {
s := NewTestService(t)
dbis, _ := s.MetaStore.Databases()
dbi := dbis[2]
cqi := dbi.ContinuousQueries[0]
rowCnt := 2
pointCnt := 1
qe := s.QueryExecutor.(*QueryExecutor)
qe.Results = []*influxql.Result{genResult(rowCnt, pointCnt)}
pw := s.PointsWriter.(*PointsWriter)
pw.WritePointsFn = func(p *cluster.WritePointsRequest) error {
if len(p.Points) != pointCnt*rowCnt {
return fmt.Errorf("exp = %d, got = %d", pointCnt, len(p.Points))
}
exp := "cpu,host=server01 value=0"
got := p.Points[0].String()
if !strings.Contains(got, exp) {
return fmt.Errorf("\n\tExpected ':MEASUREMENT' to be expanded to the measurement name(s) in the FROM regexp.\n\tqry = %s\n\texp = %s\n\tgot = %s\n", cqi.Query, got, exp)
}
exp = "cpu2,host=server01 value=0"
got = p.Points[1].String()
if !strings.Contains(got, exp) {
return fmt.Errorf("\n\tExpected ':MEASUREMENT' to be expanded to the measurement name(s) in the FROM regexp.\n\tqry = %s\n\texp = %s\n\tgot = %s\n", cqi.Query, got, exp)
}
return nil
}
err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
} }
// Test the service happy path. // Test the service happy path.
func TestService_HappyPath(t *testing.T) { func TestContinuousQueryService(t *testing.T) {
s := NewTestService(t) s := NewTestService(t)
pointCnt := 100 pointCnt := 100
@ -70,7 +110,7 @@ func TestService_HappyPath(t *testing.T) {
qe.Results = []*influxql.Result{genResult(1, pointCnt)} qe.Results = []*influxql.Result{genResult(1, pointCnt)}
pw := s.PointsWriter.(*PointsWriter) pw := s.PointsWriter.(*PointsWriter)
ch := make(chan int, 5) ch := make(chan int, 10)
defer close(ch) defer close(ch)
pw.WritePointsFn = func(p *cluster.WritePointsRequest) error { pw.WritePointsFn = func(p *cluster.WritePointsRequest) error {
ch <- len(p.Points) ch <- len(p.Points)
@ -87,7 +127,7 @@ func TestService_HappyPath(t *testing.T) {
} }
// Test Run method. // Test Run method.
func TestService_Run(t *testing.T) { func TestContinuousQueryService_Run(t *testing.T) {
s := NewTestService(t) s := NewTestService(t)
// Set RunInterval high so we can trigger using Run method. // Set RunInterval high so we can trigger using Run method.
@ -97,7 +137,7 @@ func TestService_Run(t *testing.T) {
s.Config.RecomputePreviousN = 0 s.Config.RecomputePreviousN = 0
done := make(chan struct{}) done := make(chan struct{})
expectCallCnt := 2 expectCallCnt := 3
callCnt := 0 callCnt := 0
// Set a callback for ExecuteQuery. // Set a callback for ExecuteQuery.
@ -112,7 +152,7 @@ func TestService_Run(t *testing.T) {
s.Open() s.Open()
// Trigger service to run all CQs. // Trigger service to run all CQs.
s.Run("", "") s.Run("", "", time.Now())
// Shouldn't time out. // Shouldn't time out.
if err := wait(done, 100*time.Millisecond); err != nil { if err := wait(done, 100*time.Millisecond); err != nil {
t.Error(err) t.Error(err)
@ -127,7 +167,7 @@ func TestService_Run(t *testing.T) {
expectCallCnt = 1 expectCallCnt = 1
callCnt = 0 callCnt = 0
s.Open() s.Open()
s.Run("db", "cq") s.Run("db", "cq", time.Now())
// Shouldn't time out. // Shouldn't time out.
if err := wait(done, 100*time.Millisecond); err != nil { if err := wait(done, 100*time.Millisecond); err != nil {
t.Error(err) t.Error(err)
@ -140,7 +180,7 @@ func TestService_Run(t *testing.T) {
} }
// Test service when not the cluster leader (CQs shouldn't run). // Test service when not the cluster leader (CQs shouldn't run).
func TestService_NotLeader(t *testing.T) { func TestContinuousQueryService_NotLeader(t *testing.T) {
s := NewTestService(t) s := NewTestService(t)
// Set RunInterval high so we can test triggering with the RunCh below. // Set RunInterval high so we can test triggering with the RunCh below.
s.RunInterval = 10 * time.Second s.RunInterval = 10 * time.Second
@ -156,7 +196,7 @@ func TestService_NotLeader(t *testing.T) {
s.Open() s.Open()
// Trigger service to run CQs. // Trigger service to run CQs.
s.RunCh <- struct{}{} s.RunCh <- &RunRequest{Now: time.Now()}
// Expect timeout error because ExecuteQuery callback wasn't called. // Expect timeout error because ExecuteQuery callback wasn't called.
if err := wait(done, 100*time.Millisecond); err == nil { if err := wait(done, 100*time.Millisecond); err == nil {
t.Error(err) t.Error(err)
@ -165,7 +205,7 @@ func TestService_NotLeader(t *testing.T) {
} }
// Test service behavior when meta store fails to get databases. // Test service behavior when meta store fails to get databases.
func TestService_MetaStoreFailsToGetDatabases(t *testing.T) { func TestContinuousQueryService_MetaStoreFailsToGetDatabases(t *testing.T) {
s := NewTestService(t) s := NewTestService(t)
// Set RunInterval high so we can test triggering with the RunCh below. // Set RunInterval high so we can test triggering with the RunCh below.
s.RunInterval = 10 * time.Second s.RunInterval = 10 * time.Second
@ -181,7 +221,7 @@ func TestService_MetaStoreFailsToGetDatabases(t *testing.T) {
s.Open() s.Open()
// Trigger service to run CQs. // Trigger service to run CQs.
s.RunCh <- struct{}{} s.RunCh <- &RunRequest{Now: time.Now()}
// Expect timeout error because ExecuteQuery callback wasn't called. // Expect timeout error because ExecuteQuery callback wasn't called.
if err := wait(done, 100*time.Millisecond); err == nil { if err := wait(done, 100*time.Millisecond); err == nil {
t.Error(err) t.Error(err)
@ -197,21 +237,21 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {
cqi := dbi.ContinuousQueries[0] cqi := dbi.ContinuousQueries[0]
cqi.Query = `this is not a query` cqi.Query = `this is not a query`
err := s.ExecuteContinuousQuery(&dbi, &cqi) err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
if err == nil { if err == nil {
t.Error("expected error but got nil") t.Error("expected error but got nil")
} }
// Valid query but invalid continuous query. // Valid query but invalid continuous query.
cqi.Query = `SELECT * FROM cpu` cqi.Query = `SELECT * FROM cpu`
err = s.ExecuteContinuousQuery(&dbi, &cqi) err = s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
if err == nil { if err == nil {
t.Error("expected error but got nil") t.Error("expected error but got nil")
} }
// Group by requires aggregate. // Group by requires aggregate.
cqi.Query = `SELECT value INTO other_value FROM cpu WHERE time > now() - 1h GROUP BY time(1s)` cqi.Query = `SELECT value INTO other_value FROM cpu WHERE time > now() - 1h GROUP BY time(1s)`
err = s.ExecuteContinuousQuery(&dbi, &cqi) err = s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
if err == nil { if err == nil {
t.Error("expected error but got nil") t.Error("expected error but got nil")
} }
@ -227,7 +267,7 @@ func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) {
dbi := dbis[0] dbi := dbis[0]
cqi := dbi.ContinuousQueries[0] cqi := dbi.ContinuousQueries[0]
err := s.ExecuteContinuousQuery(&dbi, &cqi) err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
if err != expectedErr { if err != expectedErr {
t.Errorf("exp = %s, got = %v", expectedErr, err) t.Errorf("exp = %s, got = %v", expectedErr, err)
} }
@ -252,6 +292,8 @@ func NewTestService(t *testing.T) *Service {
ms.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1h GROUP BY time(1s) END`) ms.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1h GROUP BY time(1s) END`)
ms.CreateDatabase("db2", "default") ms.CreateDatabase("db2", "default")
ms.CreateContinuousQuery("db2", "cq2", `CREATE CONTINUOUS QUERY cq2 ON db2 BEGIN SELECT mean(value) INTO cpu_mean FROM cpu WHERE time > now() - 10m GROUP BY time(1m) END`) ms.CreateContinuousQuery("db2", "cq2", `CREATE CONTINUOUS QUERY cq2 ON db2 BEGIN SELECT mean(value) INTO cpu_mean FROM cpu WHERE time > now() - 10m GROUP BY time(1m) END`)
ms.CreateDatabase("db3", "default")
ms.CreateContinuousQuery("db3", "cq3", `CREATE CONTINUOUS QUERY cq3 ON db3 BEGIN SELECT mean(value) INTO "1hAverages".:MEASUREMENT FROM /cpu[0-9]?/ GROUP BY time(10s) END`)
return s return s
} }
@ -471,6 +513,9 @@ func genResult(rowCnt, valCnt int) *influxql.Result {
Columns: []string{"time", "value"}, Columns: []string{"time", "value"},
Values: vals, Values: vals,
} }
if len(rows) > 0 {
row.Name = fmt.Sprintf("cpu%d", len(rows)+1)
}
rows = append(rows, row) rows = append(rows, row)
} }
return &influxql.Result{ return &influxql.Result{

View File

@ -182,9 +182,25 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R
db := q.Get("db") db := q.Get("db")
// Get the name of the CQ to run (blank means run all). // Get the name of the CQ to run (blank means run all).
name := q.Get("name") name := q.Get("name")
// Get the time for which the CQ should be evaluated.
var t time.Time
var err error
s := q.Get("time")
if s != "" {
t, err = time.Parse(time.RFC3339Nano, s)
if err != nil {
// Try parsing as an int64 nanosecond timestamp.
i, err := strconv.ParseInt(s, 10, 64)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
t = time.Unix(0, i)
}
}
// Pass the request to the CQ service. // Pass the request to the CQ service.
if err := h.ContinuousQuerier.Run(db, name); err != nil { if err := h.ContinuousQuerier.Run(db, name, t); err != nil {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }

View File

@ -868,7 +868,9 @@ func (q *QueryExecutor) normalizeStatement(stmt influxql.Statement, defaultDatab
// normalizeMeasurement inserts the default database or policy into all measurement names, // normalizeMeasurement inserts the default database or policy into all measurement names,
// if required. // if required.
func (q *QueryExecutor) normalizeMeasurement(m *influxql.Measurement, defaultDatabase string) error { func (q *QueryExecutor) normalizeMeasurement(m *influxql.Measurement, defaultDatabase string) error {
if m.Name == "" && m.Regex == nil { // Targets (measurements in an INTO clause) can have blank names, which means it will be
// the same as the measurement name it came from in the FROM clause.
if !m.IsTarget && m.Name == "" && m.Regex == nil {
return errors.New("invalid measurement") return errors.New("invalid measurement")
} }