From 66001cfbb51081af7f606fabeb6e4120466c331e Mon Sep 17 00:00:00 2001 From: David Norton Date: Wed, 2 Sep 2015 11:40:47 -0400 Subject: [PATCH] fix #2555: add integration tests for CQs --- cmd/influxd/run/server_helpers_test.go | 30 ++++++- cmd/influxd/run/server_test.go | 113 ++++++++++++++++++++----- services/continuous_querier/service.go | 61 ++++++++----- services/httpd/handler.go | 18 +++- 4 files changed, 178 insertions(+), 44 deletions(-) diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index f8e6c5b1c6..8f50b20571 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -2,6 +2,7 @@ package run_test import ( + "bytes" "encoding/json" "fmt" "io" @@ -105,10 +106,14 @@ func (s *Server) QueryWithParams(query string, values url.Values) (results strin values = url.Values{} } values.Set("q", query) - resp, err := http.Get(s.URL() + "/query?" + values.Encode()) + return s.HTTPGet(s.URL() + "/query?" + values.Encode()) +} + +// HTTPGet makes an HTTP GET request to the server and returns the response. +func (s *Server) HTTPGet(url string) (results string, err error) { + resp, err := http.Get(url) if err != nil { return "", err - //} else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusBadRequest { } body := string(MustReadAll(resp.Body)) switch resp.StatusCode { @@ -124,6 +129,27 @@ func (s *Server) QueryWithParams(query string, values url.Values) (results strin } } +// HTTPPost makes an HTTP POST request to the server and returns the response. +func (s *Server) HTTPPost(url string, content []byte) (results string, err error) { + buf := bytes.NewBuffer(content) + resp, err := http.Post(url, "application/json", buf) + if err != nil { + return "", err + } + body := string(MustReadAll(resp.Body)) + switch resp.StatusCode { + case http.StatusBadRequest: + if !expectPattern(".*error parsing query*.", body) { + return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body) + } + return body, nil + case http.StatusOK, http.StatusNoContent: + return body, nil + default: + return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body) + } +} + // Write executes a write against the server and returns the results. func (s *Server) Write(db, rp, body string, params url.Values) (results string, err error) { if params == nil { diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 0f6d4cecc1..23c109ec49 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -3631,7 +3631,7 @@ func TestServer_Query_ShowFieldKeys(t *testing.T) { } } -func TestServer_Query_CreateContinuousQuery(t *testing.T) { +func TestServer_ContinuousQuery(t *testing.T) { t.Parallel() s := OpenServer(NewConfig(), "") defer s.Close() @@ -3643,37 +3643,110 @@ func TestServer_Query_CreateContinuousQuery(t *testing.T) { t.Fatal(err) } - test := NewTest("db0", "rp0") + runTest := func(test *Test, t *testing.T) { + for i, query := range test.queries { + if i == 0 { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + } + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } + } + // Start times of CQ intervals. + interval0 := time.Now().Add(-time.Second).Round(time.Second * 5) + interval1 := interval0.Add(-time.Second * 5) + interval2 := interval0.Add(-time.Second * 10) + interval3 := interval0.Add(-time.Second * 15) + + writes := []string{ + // Point too far in the past for CQ to pick up. + fmt.Sprintf(`cpu,host=server01,region=uswest value=100 %d`, interval3.Add(time.Second).UnixNano()), + + // Points two intervals ago. + fmt.Sprintf(`cpu,host=server01 value=100 %d`, interval2.Add(time.Second).UnixNano()), + fmt.Sprintf(`cpu,host=server01,region=uswest value=100 %d`, interval2.Add(time.Second*2).UnixNano()), + fmt.Sprintf(`cpu,host=server01,region=useast value=100 %d`, interval2.Add(time.Second*3).UnixNano()), + + // Points one interval ago. + fmt.Sprintf(`gpu,host=server02,region=useast value=100 %d`, interval1.Add(time.Second).UnixNano()), + fmt.Sprintf(`gpu,host=server03,region=caeast value=100 %d`, interval1.Add(time.Second*2).UnixNano()), + + // Points in the current interval. + fmt.Sprintf(`gpu,host=server03,region=caeast value=100 %d`, interval0.Add(time.Second).UnixNano()), + fmt.Sprintf(`disk,host=server03,region=caeast value=100 %d`, interval0.Add(time.Second*2).UnixNano()), + } + + test := NewTest("db0", "rp0") + test.write = strings.Join(writes, "\n") test.addQueries([]*Query{ &Query{ - name: "create continuous query", - command: `CREATE CONTINUOUS QUERY "my.query" ON db0 BEGIN SELECT count(value) INTO measure1 FROM myseries GROUP BY time(10m) END`, + name: `create another retention policy for CQ to write into`, + command: `CREATE RETENTION POLICY rp1 ON db0 DURATION 1h REPLICATION 1`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "create continuous query with backreference", + command: `CREATE CONTINUOUS QUERY "cq1" ON db0 BEGIN SELECT count(value) INTO "rp1".:MEASUREMENT FROM /[cg]pu/ GROUP BY time(5s) END`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: `create another retention policy for CQ to write into`, + command: `CREATE RETENTION POLICY rp2 ON db0 DURATION 1h REPLICATION 1`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "create continuous query with backreference and group by time", + command: `CREATE CONTINUOUS QUERY "cq2" ON db0 BEGIN SELECT count(value) INTO "rp2".:MEASUREMENT FROM /[cg]pu/ GROUP BY time(5s), * END`, exp: `{"results":[{}]}`, }, &Query{ name: `show continuous queries`, command: `SHOW CONTINUOUS QUERIES`, - exp: `{"results":[{"series":[{"name":"db0","columns":["name","query"],"values":[["my.query","CREATE CONTINUOUS QUERY \"my.query\" ON db0 BEGIN SELECT count(value) INTO \"db0\".\"rp0\".measure1 FROM \"db0\".\"rp0\".myseries GROUP BY time(10m) END"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"db0","columns":["name","query"],"values":[["cq1","CREATE CONTINUOUS QUERY cq1 ON db0 BEGIN SELECT count(value) INTO \"db0\".\"rp1\".:MEASUREMENT FROM \"db0\".\"rp0\"./[cg]pu/ GROUP BY time(5s) END"],["cq2","CREATE CONTINUOUS QUERY cq2 ON db0 BEGIN SELECT count(value) INTO \"db0\".\"rp2\".:MEASUREMENT FROM \"db0\".\"rp0\"./[cg]pu/ GROUP BY time(5s), * END"]]}]}]}`, }, }...) - for i, query := range test.queries { - if i == 0 { - if err := test.init(s); err != nil { - t.Fatalf("test init failed: %s", err) - } - } - if query.skip { - t.Logf("SKIP:: %s", query.name) - continue - } - if err := query.Execute(s); err != nil { - t.Error(query.Error(err)) - } else if !query.success() { - t.Error(query.failureMessage()) - } + // Run first test to create CQs. + runTest(&test, t) + + // Trigger CQs to run. + u := fmt.Sprintf("%s/data/process_continuous_queries?time=%d", s.URL(), interval0.UnixNano()) + if _, err := s.HTTPPost(u, nil); err != nil { + t.Fatal(err) } + + // Wait for CQs to run. TODO: fix this ugly hack + time.Sleep(time.Second * 2) + + // Setup tests to check the CQ results. + test2 := NewTest("db0", "rp1") + test2.addQueries([]*Query{ + &Query{ + name: "check results of cq1", + command: `SELECT * FROM "rp1"./[cg]pu/`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","count","host","region","value"],"values":[["` + interval2.UTC().Format(time.RFC3339Nano) + `",3,null,null,null]]},{"name":"gpu","columns":["time","count","host","region","value"],"values":[["` + interval1.UTC().Format(time.RFC3339Nano) + `",2,null,null,null],["` + interval0.UTC().Format(time.RFC3339Nano) + `",1,null,null,null]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "check results of cq2", + command: `SELECT * FROM "rp2"./[cg]pu/`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","count","host","region","value"],"values":[["` + interval2.UTC().Format(time.RFC3339Nano) + `",1,"server01","uswest",null],["` + interval2.UTC().Format(time.RFC3339Nano) + `",1,"server01","",null],["` + interval2.UTC().Format(time.RFC3339Nano) + `",1,"server01","useast",null]]},{"name":"gpu","columns":["time","count","host","region","value"],"values":[["` + interval1.UTC().Format(time.RFC3339Nano) + `",1,"server02","useast",null],["` + interval1.UTC().Format(time.RFC3339Nano) + `",1,"server03","caeast",null],["` + interval0.UTC().Format(time.RFC3339Nano) + `",1,"server03","caeast",null]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + }...) + + // Run second test to check CQ results. + runTest(&test2, t) } // Tests that a known CQ query with concurrent writes does not deadlock the server diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index 9a3a2c65c4..bac1aaecb3 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -23,7 +23,7 @@ const ( // ContinuousQuerier represents a service that executes continuous queries. type ContinuousQuerier interface { // Run executes the named query in the named database. Blank database or name matches all. - Run(database, name string) error + Run(database, name string, t time.Time) error } // queryExecutor is an internal interface to make testing easier. @@ -43,6 +43,28 @@ type pointsWriter interface { WritePoints(p *cluster.WritePointsRequest) error } +// RunRequest is a request to run one or more CQs. +type RunRequest struct { + // Now tells the CQ serivce what the current time is. + Now time.Time + // CQs tells the CQ service which queries to run. + // If nil, all queries will be run. + CQs []string +} + +// matches returns true if the CQ matches one of the requested CQs. +func (rr *RunRequest) matches(cq *meta.ContinuousQueryInfo) bool { + if rr.CQs == nil { + return true + } + for _, q := range rr.CQs { + if q == cq.Name { + return true + } + } + return false +} + // Service manages continuous query execution. type Service struct { MetaStore metaStore @@ -51,7 +73,7 @@ type Service struct { Config *Config RunInterval time.Duration // RunCh can be used by clients to signal service to run CQs. - RunCh chan struct{} + RunCh chan *RunRequest Logger *log.Logger loggingEnabled bool // lastRuns maps CQ name to last time it was run. @@ -65,7 +87,7 @@ func NewService(c Config) *Service { s := &Service{ Config: &c, RunInterval: time.Second, - RunCh: make(chan struct{}), + RunCh: make(chan *RunRequest), loggingEnabled: c.LogEnabled, Logger: log.New(os.Stderr, "[continuous_querier] ", log.LstdFlags), lastRuns: map[string]time.Time{}, @@ -112,7 +134,7 @@ func (s *Service) SetLogger(l *log.Logger) { } // Run runs the specified continuous query, or all CQs if none is specified. -func (s *Service) Run(database, name string) error { +func (s *Service) Run(database, name string, t time.Time) error { var dbs []meta.DatabaseInfo if database != "" { @@ -145,7 +167,7 @@ func (s *Service) Run(database, name string) error { } // Signal the background routine to run CQs. - s.RunCh <- struct{}{} + s.RunCh <- &RunRequest{Now: t} return nil } @@ -158,21 +180,21 @@ func (s *Service) backgroundLoop() { case <-s.stop: s.Logger.Println("continuous query service terminating") return - case <-s.RunCh: + case req := <-s.RunCh: if s.MetaStore.IsLeader() { - s.Logger.Print("running continuous queries by request") - s.runContinuousQueries() + s.Logger.Printf("running continuous queries by request for time: %v", req.Now.UnixNano()) + s.runContinuousQueries(req) } case <-time.After(s.RunInterval): if s.MetaStore.IsLeader() { - s.runContinuousQueries() + s.runContinuousQueries(&RunRequest{Now: time.Now()}) } } } } // runContinuousQueries gets CQs from the meta store and runs them. -func (s *Service) runContinuousQueries() { +func (s *Service) runContinuousQueries(req *RunRequest) { // Get list of all databases. dbs, err := s.MetaStore.Databases() if err != nil { @@ -183,7 +205,10 @@ func (s *Service) runContinuousQueries() { for _, db := range dbs { // TODO: distribute across nodes for _, cq := range db.ContinuousQueries { - if err := s.ExecuteContinuousQuery(&db, &cq); err != nil { + if !req.matches(&cq) { + continue + } + if err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil { s.Logger.Printf("error executing query: %s: err = %s", cq.Query, err) } } @@ -191,7 +216,7 @@ func (s *Service) runContinuousQueries() { } // ExecuteContinuousQuery executes a single CQ. -func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo) error { +func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) error { // TODO: re-enable stats //s.stats.Inc("continuousQueryExecuted") @@ -219,9 +244,9 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti } // We're about to run the query so store the time. - now := time.Now() - cq.LastRun = now - s.lastRuns[cqi.Name] = now + lastRun := time.Now() + cq.LastRun = lastRun + s.lastRuns[cqi.Name] = lastRun // Get the group by interval. interval, err := cq.q.GroupByInterval() @@ -288,12 +313,6 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { return err } - // Drain results - defer func() { - for _ = range ch { - } - }() - // Read all rows from the result channel. points := make([]tsdb.Point, 0, 100) for result := range ch { diff --git a/services/httpd/handler.go b/services/httpd/handler.go index b1011a72c7..efc650656b 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -182,9 +182,25 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R db := q.Get("db") // Get the name of the CQ to run (blank means run all). name := q.Get("name") + // Get the time for which the CQ should be evaluated. + var t time.Time + var err error + s := q.Get("time") + if s != "" { + t, err = time.Parse(time.RFC3339Nano, s) + if err != nil { + // Try parsing as an int64 nanosecond timestamp. + i, err := strconv.ParseInt(s, 10, 64) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + t = time.Unix(0, i) + } + } // Pass the request to the CQ service. - if err := h.ContinuousQuerier.Run(db, name); err != nil { + if err := h.ContinuousQuerier.Run(db, name, t); err != nil { w.WriteHeader(http.StatusBadRequest) return }