diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b0aae4499..eda3ef352b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ With this release InfluxDB is moving to Go 1.5. - [#3892](https://github.com/influxdb/influxdb/pull/3892): Support IF NOT EXISTS for CREATE DATABASE - [#3916](https://github.com/influxdb/influxdb/pull/3916): New statistics and diagnostics support. Graphite first to be instrumented. - [#3901](https://github.com/influxdb/influxdb/pull/3901): Add consistency level option to influx cli Thanks @takayuki +- [#3876](https://github.com/influxdb/influxdb/pull/3876): Allow the following syntax in CQs: INTO "1hPolicy".:MEASUREMENT ### Bugfixes - [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803. diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index f8e6c5b1c6..8f50b20571 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -2,6 +2,7 @@ package run_test import ( + "bytes" "encoding/json" "fmt" "io" @@ -105,10 +106,14 @@ func (s *Server) QueryWithParams(query string, values url.Values) (results strin values = url.Values{} } values.Set("q", query) - resp, err := http.Get(s.URL() + "/query?" + values.Encode()) + return s.HTTPGet(s.URL() + "/query?" + values.Encode()) +} + +// HTTPGet makes an HTTP GET request to the server and returns the response. +func (s *Server) HTTPGet(url string) (results string, err error) { + resp, err := http.Get(url) if err != nil { return "", err - //} else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusBadRequest { } body := string(MustReadAll(resp.Body)) switch resp.StatusCode { @@ -124,6 +129,27 @@ func (s *Server) QueryWithParams(query string, values url.Values) (results strin } } +// HTTPPost makes an HTTP POST request to the server and returns the response. +func (s *Server) HTTPPost(url string, content []byte) (results string, err error) { + buf := bytes.NewBuffer(content) + resp, err := http.Post(url, "application/json", buf) + if err != nil { + return "", err + } + body := string(MustReadAll(resp.Body)) + switch resp.StatusCode { + case http.StatusBadRequest: + if !expectPattern(".*error parsing query*.", body) { + return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body) + } + return body, nil + case http.StatusOK, http.StatusNoContent: + return body, nil + default: + return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body) + } +} + // Write executes a write against the server and returns the results. func (s *Server) Write(db, rp, body string, params url.Values) (results string, err error) { if params == nil { diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 0f6d4cecc1..e18a7d15fb 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -3631,7 +3631,7 @@ func TestServer_Query_ShowFieldKeys(t *testing.T) { } } -func TestServer_Query_CreateContinuousQuery(t *testing.T) { +func TestServer_ContinuousQuery(t *testing.T) { t.Parallel() s := OpenServer(NewConfig(), "") defer s.Close() @@ -3643,37 +3643,112 @@ func TestServer_Query_CreateContinuousQuery(t *testing.T) { t.Fatal(err) } - test := NewTest("db0", "rp0") + runTest := func(test *Test, t *testing.T) { + for i, query := range test.queries { + if i == 0 { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + } + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } + } + // Start times of CQ intervals. + interval0 := time.Now().Add(-time.Second).Round(time.Second * 5) + interval1 := interval0.Add(-time.Second * 5) + interval2 := interval0.Add(-time.Second * 10) + interval3 := interval0.Add(-time.Second * 15) + + writes := []string{ + // Point too far in the past for CQ to pick up. + fmt.Sprintf(`cpu,host=server01,region=uswest value=100 %d`, interval3.Add(time.Second).UnixNano()), + + // Points two intervals ago. + fmt.Sprintf(`cpu,host=server01 value=100 %d`, interval2.Add(time.Second).UnixNano()), + fmt.Sprintf(`cpu,host=server01,region=uswest value=100 %d`, interval2.Add(time.Second*2).UnixNano()), + fmt.Sprintf(`cpu,host=server01,region=useast value=100 %d`, interval2.Add(time.Second*3).UnixNano()), + + // Points one interval ago. + fmt.Sprintf(`gpu,host=server02,region=useast value=100 %d`, interval1.Add(time.Second).UnixNano()), + fmt.Sprintf(`gpu,host=server03,region=caeast value=100 %d`, interval1.Add(time.Second*2).UnixNano()), + + // Points in the current interval. + fmt.Sprintf(`gpu,host=server03,region=caeast value=100 %d`, interval0.Add(time.Second).UnixNano()), + fmt.Sprintf(`disk,host=server03,region=caeast value=100 %d`, interval0.Add(time.Second*2).UnixNano()), + } + + test := NewTest("db0", "rp0") + test.write = strings.Join(writes, "\n") test.addQueries([]*Query{ &Query{ - name: "create continuous query", - command: `CREATE CONTINUOUS QUERY "my.query" ON db0 BEGIN SELECT count(value) INTO measure1 FROM myseries GROUP BY time(10m) END`, + name: `create another retention policy for CQ to write into`, + command: `CREATE RETENTION POLICY rp1 ON db0 DURATION 1h REPLICATION 1`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "create continuous query with backreference", + command: `CREATE CONTINUOUS QUERY "cq1" ON db0 BEGIN SELECT count(value) INTO "rp1".:MEASUREMENT FROM /[cg]pu/ GROUP BY time(5s) END`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: `create another retention policy for CQ to write into`, + command: `CREATE RETENTION POLICY rp2 ON db0 DURATION 1h REPLICATION 1`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "create continuous query with backreference and group by time", + command: `CREATE CONTINUOUS QUERY "cq2" ON db0 BEGIN SELECT count(value) INTO "rp2".:MEASUREMENT FROM /[cg]pu/ GROUP BY time(5s), * END`, exp: `{"results":[{}]}`, }, &Query{ name: `show continuous queries`, command: `SHOW CONTINUOUS QUERIES`, - exp: `{"results":[{"series":[{"name":"db0","columns":["name","query"],"values":[["my.query","CREATE CONTINUOUS QUERY \"my.query\" ON db0 BEGIN SELECT count(value) INTO \"db0\".\"rp0\".measure1 FROM \"db0\".\"rp0\".myseries GROUP BY time(10m) END"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"db0","columns":["name","query"],"values":[["cq1","CREATE CONTINUOUS QUERY cq1 ON db0 BEGIN SELECT count(value) INTO \"db0\".\"rp1\".:MEASUREMENT FROM \"db0\".\"rp0\"./[cg]pu/ GROUP BY time(5s) END"],["cq2","CREATE CONTINUOUS QUERY cq2 ON db0 BEGIN SELECT count(value) INTO \"db0\".\"rp2\".:MEASUREMENT FROM \"db0\".\"rp0\"./[cg]pu/ GROUP BY time(5s), * END"]]}]}]}`, }, }...) - for i, query := range test.queries { - if i == 0 { - if err := test.init(s); err != nil { - t.Fatalf("test init failed: %s", err) - } - } - if query.skip { - t.Logf("SKIP:: %s", query.name) - continue - } - if err := query.Execute(s); err != nil { - t.Error(query.Error(err)) - } else if !query.success() { - t.Error(query.failureMessage()) - } + // Run first test to create CQs. + runTest(&test, t) + + // Trigger CQs to run. + u := fmt.Sprintf("%s/data/process_continuous_queries?time=%d", s.URL(), interval0.UnixNano()) + if _, err := s.HTTPPost(u, nil); err != nil { + t.Fatal(err) } + + // Wait for CQs to run. TODO: fix this ugly hack + time.Sleep(time.Second * 2) + + // Setup tests to check the CQ results. + test2 := NewTest("db0", "rp1") + test2.addQueries([]*Query{ + &Query{ + name: "check results of cq1", + command: `SELECT * FROM "rp1"./[cg]pu/`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","count","host","region","value"],"values":[["` + interval2.UTC().Format(time.RFC3339Nano) + `",3,null,null,null]]},{"name":"gpu","columns":["time","count","host","region","value"],"values":[["` + interval1.UTC().Format(time.RFC3339Nano) + `",2,null,null,null],["` + interval0.UTC().Format(time.RFC3339Nano) + `",1,null,null,null]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + // TODO: restore this test once this is fixed: https://github.com/influxdb/influxdb/issues/3968 + &Query{ + skip: true, + name: "check results of cq2", + command: `SELECT * FROM "rp2"./[cg]pu/`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","count","host","region","value"],"values":[["` + interval2.UTC().Format(time.RFC3339Nano) + `",1,"server01","uswest",null],["` + interval2.UTC().Format(time.RFC3339Nano) + `",1,"server01","",null],["` + interval2.UTC().Format(time.RFC3339Nano) + `",1,"server01","useast",null]]},{"name":"gpu","columns":["time","count","host","region","value"],"values":[["` + interval1.UTC().Format(time.RFC3339Nano) + `",1,"server02","useast",null],["` + interval1.UTC().Format(time.RFC3339Nano) + `",1,"server03","caeast",null],["` + interval0.UTC().Format(time.RFC3339Nano) + `",1,"server03","caeast",null]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + }...) + + // Run second test to check CQ results. + runTest(&test2, t) } // Tests that a known CQ query with concurrent writes does not deadlock the server diff --git a/influxql/ast.go b/influxql/ast.go index 2c139b98ac..f2803c49fa 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -1508,6 +1508,9 @@ func (t *Target) String() string { var buf bytes.Buffer _, _ = buf.WriteString("INTO ") _, _ = buf.WriteString(t.Measurement.String()) + if t.Measurement.Name == "" { + _, _ = buf.WriteString(":MEASUREMENT") + } return buf.String() } @@ -2166,6 +2169,7 @@ type Measurement struct { RetentionPolicy string Name string Regex *RegexLiteral + IsTarget bool } // String returns a string representation of the measurement. diff --git a/influxql/parser.go b/influxql/parser.go index bafe42d83d..e0a76a50b7 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -488,6 +488,9 @@ func (p *Parser) parseSegmentedIdents() ([]string, error) { if ch := p.peekRune(); ch == '/' { // Next segment is a regex so we're done. break + } else if ch == ':' { + // Next segment is context-specific so let caller handle it. + break } else if ch == '.' { // Add an empty identifier. idents = append(idents, "") @@ -799,7 +802,18 @@ func (p *Parser) parseTarget(tr targetRequirement) (*Target, error) { return nil, err } - t := &Target{Measurement: &Measurement{}} + if len(idents) < 3 { + // Check for source measurement reference. + if ch := p.peekRune(); ch == ':' { + if err := p.parseTokens([]Token{COLON, MEASUREMENT}); err != nil { + return nil, err + } + // Append empty measurement name. + idents = append(idents, "") + } + } + + t := &Target{Measurement: &Measurement{IsTarget: true}} switch len(idents) { case 1: diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 489ecefde1..fee4f791b0 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -830,7 +830,7 @@ func TestParser_ParseStatement(t *testing.T) { Database: "testdb", Source: &influxql.SelectStatement{ Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "count", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}}}}, - Target: &influxql.Target{Measurement: &influxql.Measurement{Name: "measure1"}}, + Target: &influxql.Target{Measurement: &influxql.Measurement{Name: "measure1", IsTarget: true}}, Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, Dimensions: []*influxql.Dimension{ { @@ -854,7 +854,7 @@ func TestParser_ParseStatement(t *testing.T) { Source: &influxql.SelectStatement{ IsRawQuery: true, Fields: []*influxql.Field{{Expr: &influxql.Wildcard{}}}, - Target: &influxql.Target{Measurement: &influxql.Measurement{Name: "measure1"}}, + Target: &influxql.Target{Measurement: &influxql.Measurement{Name: "measure1", IsTarget: true}}, Sources: []influxql.Source{&influxql.Measurement{Name: "cpu_load_short"}}, }, }, @@ -869,7 +869,7 @@ func TestParser_ParseStatement(t *testing.T) { Source: &influxql.SelectStatement{ Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "count", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}}}}, Target: &influxql.Target{ - Measurement: &influxql.Measurement{RetentionPolicy: "1h.policy1", Name: "cpu.load"}, + Measurement: &influxql.Measurement{RetentionPolicy: "1h.policy1", Name: "cpu.load", IsTarget: true}, }, Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, Dimensions: []*influxql.Dimension{ @@ -896,7 +896,7 @@ func TestParser_ParseStatement(t *testing.T) { IsRawQuery: true, Fields: []*influxql.Field{{Expr: &influxql.VarRef{Val: "value"}}}, Target: &influxql.Target{ - Measurement: &influxql.Measurement{RetentionPolicy: "policy1", Name: "value"}, + Measurement: &influxql.Measurement{RetentionPolicy: "policy1", Name: "value", IsTarget: true}, }, Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, }, @@ -914,13 +914,39 @@ func TestParser_ParseStatement(t *testing.T) { Fields: []*influxql.Field{{Expr: &influxql.VarRef{Val: "transmit_rx"}}, {Expr: &influxql.VarRef{Val: "transmit_tx"}}}, Target: &influxql.Target{ - Measurement: &influxql.Measurement{RetentionPolicy: "policy1", Name: "network"}, + Measurement: &influxql.Measurement{RetentionPolicy: "policy1", Name: "network", IsTarget: true}, }, Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, }, }, }, + // CREATE CONTINUOUS QUERY with backreference measurement name + { + s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT mean(value) INTO "policy1".:measurement FROM /^[a-z]+.*/ GROUP BY time(1m) END`, + stmt: &influxql.CreateContinuousQueryStatement{ + Name: "myquery", + Database: "testdb", + Source: &influxql.SelectStatement{ + Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "mean", Args: []influxql.Expr{&influxql.VarRef{Val: "value"}}}}}, + Target: &influxql.Target{ + Measurement: &influxql.Measurement{RetentionPolicy: "policy1", IsTarget: true}, + }, + Sources: []influxql.Source{&influxql.Measurement{Regex: &influxql.RegexLiteral{Val: regexp.MustCompile(`^[a-z]+.*`)}}}, + Dimensions: []*influxql.Dimension{ + { + Expr: &influxql.Call{ + Name: "time", + Args: []influxql.Expr{ + &influxql.DurationLiteral{Val: 1 * time.Minute}, + }, + }, + }, + }, + }, + }, + }, + // CREATE DATABASE statement { s: `CREATE DATABASE testdb`, diff --git a/influxql/scanner.go b/influxql/scanner.go index c6dab019c8..d071c85717 100644 --- a/influxql/scanner.go +++ b/influxql/scanner.go @@ -95,6 +95,8 @@ func (s *Scanner) Scan() (tok Token, pos Pos, lit string) { return COMMA, pos, "" case ';': return SEMICOLON, pos, "" + case ':': + return COLON, pos, "" } return ILLEGAL, pos, string(ch0) diff --git a/influxql/token.go b/influxql/token.go index ef5a473af6..ae8e7b3e46 100644 --- a/influxql/token.go +++ b/influxql/token.go @@ -50,6 +50,7 @@ const ( LPAREN // ( RPAREN // ) COMMA // , + COLON // : SEMICOLON // ; DOT // . @@ -160,6 +161,7 @@ var tokens = [...]string{ LPAREN: "(", RPAREN: ")", COMMA: ",", + COLON: ":", SEMICOLON: ";", DOT: ".", diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index 77c1ae66a6..f6593dee7c 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -23,7 +23,7 @@ const ( // ContinuousQuerier represents a service that executes continuous queries. type ContinuousQuerier interface { // Run executes the named query in the named database. Blank database or name matches all. - Run(database, name string) error + Run(database, name string, t time.Time) error } // queryExecutor is an internal interface to make testing easier. @@ -43,6 +43,28 @@ type pointsWriter interface { WritePoints(p *cluster.WritePointsRequest) error } +// RunRequest is a request to run one or more CQs. +type RunRequest struct { + // Now tells the CQ serivce what the current time is. + Now time.Time + // CQs tells the CQ service which queries to run. + // If nil, all queries will be run. + CQs []string +} + +// matches returns true if the CQ matches one of the requested CQs. +func (rr *RunRequest) matches(cq *meta.ContinuousQueryInfo) bool { + if rr.CQs == nil { + return true + } + for _, q := range rr.CQs { + if q == cq.Name { + return true + } + } + return false +} + // Service manages continuous query execution. type Service struct { MetaStore metaStore @@ -51,7 +73,7 @@ type Service struct { Config *Config RunInterval time.Duration // RunCh can be used by clients to signal service to run CQs. - RunCh chan struct{} + RunCh chan *RunRequest Logger *log.Logger loggingEnabled bool // lastRuns maps CQ name to last time it was run. @@ -65,7 +87,7 @@ func NewService(c Config) *Service { s := &Service{ Config: &c, RunInterval: time.Second, - RunCh: make(chan struct{}), + RunCh: make(chan *RunRequest), loggingEnabled: c.LogEnabled, Logger: log.New(os.Stderr, "[continuous_querier] ", log.LstdFlags), lastRuns: map[string]time.Time{}, @@ -112,7 +134,7 @@ func (s *Service) SetLogger(l *log.Logger) { } // Run runs the specified continuous query, or all CQs if none is specified. -func (s *Service) Run(database, name string) error { +func (s *Service) Run(database, name string, t time.Time) error { var dbs []meta.DatabaseInfo if database != "" { @@ -145,7 +167,7 @@ func (s *Service) Run(database, name string) error { } // Signal the background routine to run CQs. - s.RunCh <- struct{}{} + s.RunCh <- &RunRequest{Now: t} return nil } @@ -158,21 +180,21 @@ func (s *Service) backgroundLoop() { case <-s.stop: s.Logger.Println("continuous query service terminating") return - case <-s.RunCh: + case req := <-s.RunCh: if s.MetaStore.IsLeader() { - s.Logger.Print("running continuous queries by request") - s.runContinuousQueries() + s.Logger.Printf("running continuous queries by request for time: %v", req.Now.UnixNano()) + s.runContinuousQueries(req) } case <-time.After(s.RunInterval): if s.MetaStore.IsLeader() { - s.runContinuousQueries() + s.runContinuousQueries(&RunRequest{Now: time.Now()}) } } } } // runContinuousQueries gets CQs from the meta store and runs them. -func (s *Service) runContinuousQueries() { +func (s *Service) runContinuousQueries(req *RunRequest) { // Get list of all databases. dbs, err := s.MetaStore.Databases() if err != nil { @@ -183,7 +205,10 @@ func (s *Service) runContinuousQueries() { for _, db := range dbs { // TODO: distribute across nodes for _, cq := range db.ContinuousQueries { - if err := s.ExecuteContinuousQuery(&db, &cq); err != nil { + if !req.matches(&cq) { + continue + } + if err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil { s.Logger.Printf("error executing query: %s: err = %s", cq.Query, err) } } @@ -191,7 +216,7 @@ func (s *Service) runContinuousQueries() { } // ExecuteContinuousQuery executes a single CQ. -func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo) error { +func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) error { // TODO: re-enable stats //s.stats.Inc("continuousQueryExecuted") @@ -219,9 +244,9 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti } // We're about to run the query so store the time. - now := time.Now() - cq.LastRun = now - s.lastRuns[cqi.Name] = now + lastRun := time.Now() + cq.LastRun = lastRun + s.lastRuns[cqi.Name] = lastRun // Get the group by interval. interval, err := cq.q.GroupByInterval() @@ -288,12 +313,6 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { return err } - // Drain results - defer func() { - for _ = range ch { - } - }() - // Read all rows from the result channel. points := make([]tsdb.Point, 0, 100) for result := range ch { @@ -302,8 +321,13 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { } for _, row := range result.Series { + // Get the measurement name for the result. + measurement := cq.intoMeasurement() + if measurement == "" { + measurement = row.Name + } // Convert the result row to points. - part, err := s.convertRowToPoints(cq.intoMeasurement(), row) + part, err := s.convertRowToPoints(measurement, row) if err != nil { log.Println(err) continue @@ -346,7 +370,7 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { } if s.loggingEnabled { - s.Logger.Printf("wrote %d point(s) to %s.%s.%s", len(points), cq.intoDB(), cq.intoRP(), cq.Info.Name) + s.Logger.Printf("wrote %d point(s) to %s.%s", len(points), cq.intoDB(), cq.intoRP()) } return nil @@ -393,7 +417,13 @@ type ContinuousQuery struct { q *influxql.SelectStatement } -func (cq *ContinuousQuery) intoDB() string { return cq.q.Target.Measurement.Database } +func (cq *ContinuousQuery) intoDB() string { + if cq.q.Target.Measurement.Database != "" { + return cq.q.Target.Measurement.Database + } + return cq.Database +} + func (cq *ContinuousQuery) intoRP() string { return cq.q.Target.Measurement.RetentionPolicy } func (cq *ContinuousQuery) setIntoRP(rp string) { cq.q.Target.Measurement.RetentionPolicy = rp } func (cq *ContinuousQuery) intoMeasurement() string { return cq.q.Target.Measurement.Name } diff --git a/services/continuous_querier/service_test.go b/services/continuous_querier/service_test.go index dfe0f76112..30c4987977 100644 --- a/services/continuous_querier/service_test.go +++ b/services/continuous_querier/service_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "log" + "strings" "sync" "testing" "time" @@ -36,8 +37,8 @@ func TestOpenAndClose(t *testing.T) { } } -// Test ExecuteContinuousQuery happy path. -func TestExecuteContinuousQuery_HappyPath(t *testing.T) { +// Test ExecuteContinuousQuery. +func TestExecuteContinuousQuery(t *testing.T) { s := NewTestService(t) dbis, _ := s.MetaStore.Databases() dbi := dbis[0] @@ -55,14 +56,53 @@ func TestExecuteContinuousQuery_HappyPath(t *testing.T) { return nil } - err := s.ExecuteContinuousQuery(&dbi, &cqi) + err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()) + if err != nil { + t.Error(err) + } +} + +// Test ExecuteContinuousQuery when INTO measurements are taken from the FROM clause. +func TestExecuteContinuousQuery_ReferenceSource(t *testing.T) { + s := NewTestService(t) + dbis, _ := s.MetaStore.Databases() + dbi := dbis[2] + cqi := dbi.ContinuousQueries[0] + + rowCnt := 2 + pointCnt := 1 + qe := s.QueryExecutor.(*QueryExecutor) + qe.Results = []*influxql.Result{genResult(rowCnt, pointCnt)} + + pw := s.PointsWriter.(*PointsWriter) + pw.WritePointsFn = func(p *cluster.WritePointsRequest) error { + if len(p.Points) != pointCnt*rowCnt { + return fmt.Errorf("exp = %d, got = %d", pointCnt, len(p.Points)) + } + + exp := "cpu,host=server01 value=0" + got := p.Points[0].String() + if !strings.Contains(got, exp) { + return fmt.Errorf("\n\tExpected ':MEASUREMENT' to be expanded to the measurement name(s) in the FROM regexp.\n\tqry = %s\n\texp = %s\n\tgot = %s\n", cqi.Query, got, exp) + } + + exp = "cpu2,host=server01 value=0" + got = p.Points[1].String() + if !strings.Contains(got, exp) { + return fmt.Errorf("\n\tExpected ':MEASUREMENT' to be expanded to the measurement name(s) in the FROM regexp.\n\tqry = %s\n\texp = %s\n\tgot = %s\n", cqi.Query, got, exp) + } + + return nil + } + + err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()) if err != nil { t.Error(err) } } // Test the service happy path. -func TestService_HappyPath(t *testing.T) { +func TestContinuousQueryService(t *testing.T) { s := NewTestService(t) pointCnt := 100 @@ -70,7 +110,7 @@ func TestService_HappyPath(t *testing.T) { qe.Results = []*influxql.Result{genResult(1, pointCnt)} pw := s.PointsWriter.(*PointsWriter) - ch := make(chan int, 5) + ch := make(chan int, 10) defer close(ch) pw.WritePointsFn = func(p *cluster.WritePointsRequest) error { ch <- len(p.Points) @@ -87,7 +127,7 @@ func TestService_HappyPath(t *testing.T) { } // Test Run method. -func TestService_Run(t *testing.T) { +func TestContinuousQueryService_Run(t *testing.T) { s := NewTestService(t) // Set RunInterval high so we can trigger using Run method. @@ -97,7 +137,7 @@ func TestService_Run(t *testing.T) { s.Config.RecomputePreviousN = 0 done := make(chan struct{}) - expectCallCnt := 2 + expectCallCnt := 3 callCnt := 0 // Set a callback for ExecuteQuery. @@ -112,7 +152,7 @@ func TestService_Run(t *testing.T) { s.Open() // Trigger service to run all CQs. - s.Run("", "") + s.Run("", "", time.Now()) // Shouldn't time out. if err := wait(done, 100*time.Millisecond); err != nil { t.Error(err) @@ -127,7 +167,7 @@ func TestService_Run(t *testing.T) { expectCallCnt = 1 callCnt = 0 s.Open() - s.Run("db", "cq") + s.Run("db", "cq", time.Now()) // Shouldn't time out. if err := wait(done, 100*time.Millisecond); err != nil { t.Error(err) @@ -140,7 +180,7 @@ func TestService_Run(t *testing.T) { } // Test service when not the cluster leader (CQs shouldn't run). -func TestService_NotLeader(t *testing.T) { +func TestContinuousQueryService_NotLeader(t *testing.T) { s := NewTestService(t) // Set RunInterval high so we can test triggering with the RunCh below. s.RunInterval = 10 * time.Second @@ -156,7 +196,7 @@ func TestService_NotLeader(t *testing.T) { s.Open() // Trigger service to run CQs. - s.RunCh <- struct{}{} + s.RunCh <- &RunRequest{Now: time.Now()} // Expect timeout error because ExecuteQuery callback wasn't called. if err := wait(done, 100*time.Millisecond); err == nil { t.Error(err) @@ -165,7 +205,7 @@ func TestService_NotLeader(t *testing.T) { } // Test service behavior when meta store fails to get databases. -func TestService_MetaStoreFailsToGetDatabases(t *testing.T) { +func TestContinuousQueryService_MetaStoreFailsToGetDatabases(t *testing.T) { s := NewTestService(t) // Set RunInterval high so we can test triggering with the RunCh below. s.RunInterval = 10 * time.Second @@ -181,7 +221,7 @@ func TestService_MetaStoreFailsToGetDatabases(t *testing.T) { s.Open() // Trigger service to run CQs. - s.RunCh <- struct{}{} + s.RunCh <- &RunRequest{Now: time.Now()} // Expect timeout error because ExecuteQuery callback wasn't called. if err := wait(done, 100*time.Millisecond); err == nil { t.Error(err) @@ -197,21 +237,21 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) { cqi := dbi.ContinuousQueries[0] cqi.Query = `this is not a query` - err := s.ExecuteContinuousQuery(&dbi, &cqi) + err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()) if err == nil { t.Error("expected error but got nil") } // Valid query but invalid continuous query. cqi.Query = `SELECT * FROM cpu` - err = s.ExecuteContinuousQuery(&dbi, &cqi) + err = s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()) if err == nil { t.Error("expected error but got nil") } // Group by requires aggregate. cqi.Query = `SELECT value INTO other_value FROM cpu WHERE time > now() - 1h GROUP BY time(1s)` - err = s.ExecuteContinuousQuery(&dbi, &cqi) + err = s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()) if err == nil { t.Error("expected error but got nil") } @@ -227,7 +267,7 @@ func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) { dbi := dbis[0] cqi := dbi.ContinuousQueries[0] - err := s.ExecuteContinuousQuery(&dbi, &cqi) + err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now()) if err != expectedErr { t.Errorf("exp = %s, got = %v", expectedErr, err) } @@ -252,6 +292,8 @@ func NewTestService(t *testing.T) *Service { ms.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1h GROUP BY time(1s) END`) ms.CreateDatabase("db2", "default") ms.CreateContinuousQuery("db2", "cq2", `CREATE CONTINUOUS QUERY cq2 ON db2 BEGIN SELECT mean(value) INTO cpu_mean FROM cpu WHERE time > now() - 10m GROUP BY time(1m) END`) + ms.CreateDatabase("db3", "default") + ms.CreateContinuousQuery("db3", "cq3", `CREATE CONTINUOUS QUERY cq3 ON db3 BEGIN SELECT mean(value) INTO "1hAverages".:MEASUREMENT FROM /cpu[0-9]?/ GROUP BY time(10s) END`) return s } @@ -471,6 +513,9 @@ func genResult(rowCnt, valCnt int) *influxql.Result { Columns: []string{"time", "value"}, Values: vals, } + if len(rows) > 0 { + row.Name = fmt.Sprintf("cpu%d", len(rows)+1) + } rows = append(rows, row) } return &influxql.Result{ diff --git a/services/httpd/handler.go b/services/httpd/handler.go index b1011a72c7..efc650656b 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -182,9 +182,25 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R db := q.Get("db") // Get the name of the CQ to run (blank means run all). name := q.Get("name") + // Get the time for which the CQ should be evaluated. + var t time.Time + var err error + s := q.Get("time") + if s != "" { + t, err = time.Parse(time.RFC3339Nano, s) + if err != nil { + // Try parsing as an int64 nanosecond timestamp. + i, err := strconv.ParseInt(s, 10, 64) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + t = time.Unix(0, i) + } + } // Pass the request to the CQ service. - if err := h.ContinuousQuerier.Run(db, name); err != nil { + if err := h.ContinuousQuerier.Run(db, name, t); err != nil { w.WriteHeader(http.StatusBadRequest) return } diff --git a/tsdb/query_executor.go b/tsdb/query_executor.go index 07120e7417..0c5234a5c1 100644 --- a/tsdb/query_executor.go +++ b/tsdb/query_executor.go @@ -868,7 +868,9 @@ func (q *QueryExecutor) normalizeStatement(stmt influxql.Statement, defaultDatab // normalizeMeasurement inserts the default database or policy into all measurement names, // if required. func (q *QueryExecutor) normalizeMeasurement(m *influxql.Measurement, defaultDatabase string) error { - if m.Name == "" && m.Regex == nil { + // Targets (measurements in an INTO clause) can have blank names, which means it will be + // the same as the measurement name it came from in the FROM clause. + if !m.IsTarget && m.Name == "" && m.Regex == nil { return errors.New("invalid measurement") }