fix #2555: add integration tests for CQs
parent
021a6f5453
commit
66001cfbb5
|
@ -2,6 +2,7 @@
|
|||
package run_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -105,10 +106,14 @@ func (s *Server) QueryWithParams(query string, values url.Values) (results strin
|
|||
values = url.Values{}
|
||||
}
|
||||
values.Set("q", query)
|
||||
resp, err := http.Get(s.URL() + "/query?" + values.Encode())
|
||||
return s.HTTPGet(s.URL() + "/query?" + values.Encode())
|
||||
}
|
||||
|
||||
// HTTPGet makes an HTTP GET request to the server and returns the response.
|
||||
func (s *Server) HTTPGet(url string) (results string, err error) {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return "", err
|
||||
//} else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusBadRequest {
|
||||
}
|
||||
body := string(MustReadAll(resp.Body))
|
||||
switch resp.StatusCode {
|
||||
|
@ -124,6 +129,27 @@ func (s *Server) QueryWithParams(query string, values url.Values) (results strin
|
|||
}
|
||||
}
|
||||
|
||||
// HTTPPost makes an HTTP POST request to the server and returns the response.
|
||||
func (s *Server) HTTPPost(url string, content []byte) (results string, err error) {
|
||||
buf := bytes.NewBuffer(content)
|
||||
resp, err := http.Post(url, "application/json", buf)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
body := string(MustReadAll(resp.Body))
|
||||
switch resp.StatusCode {
|
||||
case http.StatusBadRequest:
|
||||
if !expectPattern(".*error parsing query*.", body) {
|
||||
return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body)
|
||||
}
|
||||
return body, nil
|
||||
case http.StatusOK, http.StatusNoContent:
|
||||
return body, nil
|
||||
default:
|
||||
return "", fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body)
|
||||
}
|
||||
}
|
||||
|
||||
// Write executes a write against the server and returns the results.
|
||||
func (s *Server) Write(db, rp, body string, params url.Values) (results string, err error) {
|
||||
if params == nil {
|
||||
|
|
|
@ -3631,7 +3631,7 @@ func TestServer_Query_ShowFieldKeys(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestServer_Query_CreateContinuousQuery(t *testing.T) {
|
||||
func TestServer_ContinuousQuery(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := OpenServer(NewConfig(), "")
|
||||
defer s.Close()
|
||||
|
@ -3643,37 +3643,110 @@ func TestServer_Query_CreateContinuousQuery(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
runTest := func(test *Test, t *testing.T) {
|
||||
for i, query := range test.queries {
|
||||
if i == 0 {
|
||||
if err := test.init(s); err != nil {
|
||||
t.Fatalf("test init failed: %s", err)
|
||||
}
|
||||
}
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
if err := query.Execute(s); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start times of CQ intervals.
|
||||
interval0 := time.Now().Add(-time.Second).Round(time.Second * 5)
|
||||
interval1 := interval0.Add(-time.Second * 5)
|
||||
interval2 := interval0.Add(-time.Second * 10)
|
||||
interval3 := interval0.Add(-time.Second * 15)
|
||||
|
||||
writes := []string{
|
||||
// Point too far in the past for CQ to pick up.
|
||||
fmt.Sprintf(`cpu,host=server01,region=uswest value=100 %d`, interval3.Add(time.Second).UnixNano()),
|
||||
|
||||
// Points two intervals ago.
|
||||
fmt.Sprintf(`cpu,host=server01 value=100 %d`, interval2.Add(time.Second).UnixNano()),
|
||||
fmt.Sprintf(`cpu,host=server01,region=uswest value=100 %d`, interval2.Add(time.Second*2).UnixNano()),
|
||||
fmt.Sprintf(`cpu,host=server01,region=useast value=100 %d`, interval2.Add(time.Second*3).UnixNano()),
|
||||
|
||||
// Points one interval ago.
|
||||
fmt.Sprintf(`gpu,host=server02,region=useast value=100 %d`, interval1.Add(time.Second).UnixNano()),
|
||||
fmt.Sprintf(`gpu,host=server03,region=caeast value=100 %d`, interval1.Add(time.Second*2).UnixNano()),
|
||||
|
||||
// Points in the current interval.
|
||||
fmt.Sprintf(`gpu,host=server03,region=caeast value=100 %d`, interval0.Add(time.Second).UnixNano()),
|
||||
fmt.Sprintf(`disk,host=server03,region=caeast value=100 %d`, interval0.Add(time.Second*2).UnixNano()),
|
||||
}
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.write = strings.Join(writes, "\n")
|
||||
test.addQueries([]*Query{
|
||||
&Query{
|
||||
name: "create continuous query",
|
||||
command: `CREATE CONTINUOUS QUERY "my.query" ON db0 BEGIN SELECT count(value) INTO measure1 FROM myseries GROUP BY time(10m) END`,
|
||||
name: `create another retention policy for CQ to write into`,
|
||||
command: `CREATE RETENTION POLICY rp1 ON db0 DURATION 1h REPLICATION 1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create continuous query with backreference",
|
||||
command: `CREATE CONTINUOUS QUERY "cq1" ON db0 BEGIN SELECT count(value) INTO "rp1".:MEASUREMENT FROM /[cg]pu/ GROUP BY time(5s) END`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: `create another retention policy for CQ to write into`,
|
||||
command: `CREATE RETENTION POLICY rp2 ON db0 DURATION 1h REPLICATION 1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create continuous query with backreference and group by time",
|
||||
command: `CREATE CONTINUOUS QUERY "cq2" ON db0 BEGIN SELECT count(value) INTO "rp2".:MEASUREMENT FROM /[cg]pu/ GROUP BY time(5s), * END`,
|
||||
exp: `{"results":[{}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: `show continuous queries`,
|
||||
command: `SHOW CONTINUOUS QUERIES`,
|
||||
exp: `{"results":[{"series":[{"name":"db0","columns":["name","query"],"values":[["my.query","CREATE CONTINUOUS QUERY \"my.query\" ON db0 BEGIN SELECT count(value) INTO \"db0\".\"rp0\".measure1 FROM \"db0\".\"rp0\".myseries GROUP BY time(10m) END"]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"name":"db0","columns":["name","query"],"values":[["cq1","CREATE CONTINUOUS QUERY cq1 ON db0 BEGIN SELECT count(value) INTO \"db0\".\"rp1\".:MEASUREMENT FROM \"db0\".\"rp0\"./[cg]pu/ GROUP BY time(5s) END"],["cq2","CREATE CONTINUOUS QUERY cq2 ON db0 BEGIN SELECT count(value) INTO \"db0\".\"rp2\".:MEASUREMENT FROM \"db0\".\"rp0\"./[cg]pu/ GROUP BY time(5s), * END"]]}]}]}`,
|
||||
},
|
||||
}...)
|
||||
|
||||
for i, query := range test.queries {
|
||||
if i == 0 {
|
||||
if err := test.init(s); err != nil {
|
||||
t.Fatalf("test init failed: %s", err)
|
||||
}
|
||||
}
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
if err := query.Execute(s); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
// Run first test to create CQs.
|
||||
runTest(&test, t)
|
||||
|
||||
// Trigger CQs to run.
|
||||
u := fmt.Sprintf("%s/data/process_continuous_queries?time=%d", s.URL(), interval0.UnixNano())
|
||||
if _, err := s.HTTPPost(u, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Wait for CQs to run. TODO: fix this ugly hack
|
||||
time.Sleep(time.Second * 2)
|
||||
|
||||
// Setup tests to check the CQ results.
|
||||
test2 := NewTest("db0", "rp1")
|
||||
test2.addQueries([]*Query{
|
||||
&Query{
|
||||
name: "check results of cq1",
|
||||
command: `SELECT * FROM "rp1"./[cg]pu/`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","count","host","region","value"],"values":[["` + interval2.UTC().Format(time.RFC3339Nano) + `",3,null,null,null]]},{"name":"gpu","columns":["time","count","host","region","value"],"values":[["` + interval1.UTC().Format(time.RFC3339Nano) + `",2,null,null,null],["` + interval0.UTC().Format(time.RFC3339Nano) + `",1,null,null,null]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "check results of cq2",
|
||||
command: `SELECT * FROM "rp2"./[cg]pu/`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","count","host","region","value"],"values":[["` + interval2.UTC().Format(time.RFC3339Nano) + `",1,"server01","uswest",null],["` + interval2.UTC().Format(time.RFC3339Nano) + `",1,"server01","",null],["` + interval2.UTC().Format(time.RFC3339Nano) + `",1,"server01","useast",null]]},{"name":"gpu","columns":["time","count","host","region","value"],"values":[["` + interval1.UTC().Format(time.RFC3339Nano) + `",1,"server02","useast",null],["` + interval1.UTC().Format(time.RFC3339Nano) + `",1,"server03","caeast",null],["` + interval0.UTC().Format(time.RFC3339Nano) + `",1,"server03","caeast",null]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
}...)
|
||||
|
||||
// Run second test to check CQ results.
|
||||
runTest(&test2, t)
|
||||
}
|
||||
|
||||
// Tests that a known CQ query with concurrent writes does not deadlock the server
|
||||
|
|
|
@ -23,7 +23,7 @@ const (
|
|||
// ContinuousQuerier represents a service that executes continuous queries.
|
||||
type ContinuousQuerier interface {
|
||||
// Run executes the named query in the named database. Blank database or name matches all.
|
||||
Run(database, name string) error
|
||||
Run(database, name string, t time.Time) error
|
||||
}
|
||||
|
||||
// queryExecutor is an internal interface to make testing easier.
|
||||
|
@ -43,6 +43,28 @@ type pointsWriter interface {
|
|||
WritePoints(p *cluster.WritePointsRequest) error
|
||||
}
|
||||
|
||||
// RunRequest is a request to run one or more CQs.
|
||||
type RunRequest struct {
|
||||
// Now tells the CQ serivce what the current time is.
|
||||
Now time.Time
|
||||
// CQs tells the CQ service which queries to run.
|
||||
// If nil, all queries will be run.
|
||||
CQs []string
|
||||
}
|
||||
|
||||
// matches returns true if the CQ matches one of the requested CQs.
|
||||
func (rr *RunRequest) matches(cq *meta.ContinuousQueryInfo) bool {
|
||||
if rr.CQs == nil {
|
||||
return true
|
||||
}
|
||||
for _, q := range rr.CQs {
|
||||
if q == cq.Name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Service manages continuous query execution.
|
||||
type Service struct {
|
||||
MetaStore metaStore
|
||||
|
@ -51,7 +73,7 @@ type Service struct {
|
|||
Config *Config
|
||||
RunInterval time.Duration
|
||||
// RunCh can be used by clients to signal service to run CQs.
|
||||
RunCh chan struct{}
|
||||
RunCh chan *RunRequest
|
||||
Logger *log.Logger
|
||||
loggingEnabled bool
|
||||
// lastRuns maps CQ name to last time it was run.
|
||||
|
@ -65,7 +87,7 @@ func NewService(c Config) *Service {
|
|||
s := &Service{
|
||||
Config: &c,
|
||||
RunInterval: time.Second,
|
||||
RunCh: make(chan struct{}),
|
||||
RunCh: make(chan *RunRequest),
|
||||
loggingEnabled: c.LogEnabled,
|
||||
Logger: log.New(os.Stderr, "[continuous_querier] ", log.LstdFlags),
|
||||
lastRuns: map[string]time.Time{},
|
||||
|
@ -112,7 +134,7 @@ func (s *Service) SetLogger(l *log.Logger) {
|
|||
}
|
||||
|
||||
// Run runs the specified continuous query, or all CQs if none is specified.
|
||||
func (s *Service) Run(database, name string) error {
|
||||
func (s *Service) Run(database, name string, t time.Time) error {
|
||||
var dbs []meta.DatabaseInfo
|
||||
|
||||
if database != "" {
|
||||
|
@ -145,7 +167,7 @@ func (s *Service) Run(database, name string) error {
|
|||
}
|
||||
|
||||
// Signal the background routine to run CQs.
|
||||
s.RunCh <- struct{}{}
|
||||
s.RunCh <- &RunRequest{Now: t}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -158,21 +180,21 @@ func (s *Service) backgroundLoop() {
|
|||
case <-s.stop:
|
||||
s.Logger.Println("continuous query service terminating")
|
||||
return
|
||||
case <-s.RunCh:
|
||||
case req := <-s.RunCh:
|
||||
if s.MetaStore.IsLeader() {
|
||||
s.Logger.Print("running continuous queries by request")
|
||||
s.runContinuousQueries()
|
||||
s.Logger.Printf("running continuous queries by request for time: %v", req.Now.UnixNano())
|
||||
s.runContinuousQueries(req)
|
||||
}
|
||||
case <-time.After(s.RunInterval):
|
||||
if s.MetaStore.IsLeader() {
|
||||
s.runContinuousQueries()
|
||||
s.runContinuousQueries(&RunRequest{Now: time.Now()})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runContinuousQueries gets CQs from the meta store and runs them.
|
||||
func (s *Service) runContinuousQueries() {
|
||||
func (s *Service) runContinuousQueries(req *RunRequest) {
|
||||
// Get list of all databases.
|
||||
dbs, err := s.MetaStore.Databases()
|
||||
if err != nil {
|
||||
|
@ -183,7 +205,10 @@ func (s *Service) runContinuousQueries() {
|
|||
for _, db := range dbs {
|
||||
// TODO: distribute across nodes
|
||||
for _, cq := range db.ContinuousQueries {
|
||||
if err := s.ExecuteContinuousQuery(&db, &cq); err != nil {
|
||||
if !req.matches(&cq) {
|
||||
continue
|
||||
}
|
||||
if err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil {
|
||||
s.Logger.Printf("error executing query: %s: err = %s", cq.Query, err)
|
||||
}
|
||||
}
|
||||
|
@ -191,7 +216,7 @@ func (s *Service) runContinuousQueries() {
|
|||
}
|
||||
|
||||
// ExecuteContinuousQuery executes a single CQ.
|
||||
func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo) error {
|
||||
func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) error {
|
||||
// TODO: re-enable stats
|
||||
//s.stats.Inc("continuousQueryExecuted")
|
||||
|
||||
|
@ -219,9 +244,9 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
|
|||
}
|
||||
|
||||
// We're about to run the query so store the time.
|
||||
now := time.Now()
|
||||
cq.LastRun = now
|
||||
s.lastRuns[cqi.Name] = now
|
||||
lastRun := time.Now()
|
||||
cq.LastRun = lastRun
|
||||
s.lastRuns[cqi.Name] = lastRun
|
||||
|
||||
// Get the group by interval.
|
||||
interval, err := cq.q.GroupByInterval()
|
||||
|
@ -288,12 +313,6 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Drain results
|
||||
defer func() {
|
||||
for _ = range ch {
|
||||
}
|
||||
}()
|
||||
|
||||
// Read all rows from the result channel.
|
||||
points := make([]tsdb.Point, 0, 100)
|
||||
for result := range ch {
|
||||
|
|
|
@ -182,9 +182,25 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R
|
|||
db := q.Get("db")
|
||||
// Get the name of the CQ to run (blank means run all).
|
||||
name := q.Get("name")
|
||||
// Get the time for which the CQ should be evaluated.
|
||||
var t time.Time
|
||||
var err error
|
||||
s := q.Get("time")
|
||||
if s != "" {
|
||||
t, err = time.Parse(time.RFC3339Nano, s)
|
||||
if err != nil {
|
||||
// Try parsing as an int64 nanosecond timestamp.
|
||||
i, err := strconv.ParseInt(s, 10, 64)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
t = time.Unix(0, i)
|
||||
}
|
||||
}
|
||||
|
||||
// Pass the request to the CQ service.
|
||||
if err := h.ContinuousQuerier.Run(db, name); err != nil {
|
||||
if err := h.ContinuousQuerier.Run(db, name, t); err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue