diff --git a/influxql/ast.go b/influxql/ast.go index b3718fda51..f3ff3bd359 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -719,11 +719,22 @@ func (s *SelectStatement) GroupByInterval() (time.Duration, error) { // SetTimeRange sets the start and end time of the select statement to [start, end). i.e. start inclusive, end exclusive. // This is used commonly for continuous queries so the start and end are in buckets. func (s *SelectStatement) SetTimeRange(start, end time.Time) error { - cond := fmt.Sprintf("time >= %ds AND time < %ds", start.Unix(), end.Unix()) + cond := fmt.Sprintf("time >= '%s' AND time < '%s'", start.Format(DateTimeFormat), end.Format(DateTimeFormat)) if s.Condition != nil { cond = fmt.Sprintf("%s AND %s", s.rewriteWithoutTimeDimensions(), cond) } + // cond = "" + // var filteredDims Dimensions + // for _, d := range s.Dimensions { + // if call, ok := d.Expr.(*Call); ok && strings.ToLower(call.Name) == "time" { + // // do nothing + // } else { + // filteredDims = append(filteredDims, d) + // } + // } + // s.Dimensions = filteredDims + expr, err := NewParser(strings.NewReader(cond)).ParseExpr() if err != nil { return err @@ -745,10 +756,13 @@ func (s *SelectStatement) rewriteWithoutTimeDimensions() string { return &BooleanLiteral{Val: true} } return n + case *Call: + return &BooleanLiteral{Val: true} default: return n } }) + return n.String() } @@ -1657,11 +1671,15 @@ func timeExprValue(ref Expr, lit Expr) time.Time { if ref, ok := ref.(*VarRef); ok && strings.ToLower(ref.Val) == "time" { switch lit := lit.(type) { case *TimeLiteral: + warn("timeExpr ", lit.Val.String()) return lit.Val case *DurationLiteral: return time.Unix(0, int64(lit.Val)).UTC() + default: + warn("timeExpr: ", lit.String()) } } + warn("timeExpr is nil") return time.Time{} } diff --git a/server.go b/server.go index 2293351e35..0261c8e3ac 100644 --- a/server.go +++ b/server.go @@ -3034,9 +3034,9 @@ func (s *Server) RunContinuousQueries() error { for _, d := range s.databases { for _, c := range d.continuousQueries { if s.shouldRunContinuousQuery(c) { - // go func(cq *ContinuousQuery) { - s.runContinuousQuery(c) - // }(c) + go func(cq *ContinuousQuery) { + s.runContinuousQuery(c) + }(c) } } } @@ -3126,7 +3126,7 @@ func (s *Server) runContinuousQuery(cq *ContinuousQuery) { // runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { - log.Printf("cq run: %s\n", cq.cq.String()) + log.Printf("cq run: %s %s\n", cq.cq.Database, cq.cq.Source.String()) e, err := s.planSelectStatement(cq.cq.Source, cq.cq.Database) if err != nil { @@ -3144,6 +3144,7 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { db := "" retentionPolicy := "" for row := range ch { + warn("row: ", row) points, err := s.convertRowToPoints(row) if err != nil { log.Println(err) @@ -3158,6 +3159,7 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { } } } + warn("cq.run.write") return nil } diff --git a/server_test.go b/server_test.go index 2c2ad7f74f..515a203e95 100644 --- a/server_test.go +++ b/server_test.go @@ -1173,9 +1173,10 @@ func TestServer_RunContinuousQueries(t *testing.T) { if err := s.CreateDatabase("foo"); err != nil { t.Fatal(err) } - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil { + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw"}); err != nil { t.Fatal(err) } + s.SetDefaultRetentionPolicy("foo", "raw") s.RecomputePreviousN = 2 s.RecomputeNoOlderThan = 4 * time.Second @@ -1183,7 +1184,7 @@ func TestServer_RunContinuousQueries(t *testing.T) { s.ComputeNoMoreThan = 2 * time.Second // create and check - q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(5s) END" + q := `CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT mean(value) INTO cpu_region FROM "foo"."raw".cpu GROUP BY time(5s), region END` stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement() if err != nil { t.Fatalf("error parsing query %s", err.Error()) @@ -1193,26 +1194,43 @@ func TestServer_RunContinuousQueries(t *testing.T) { t.Fatalf("error creating continuous query %s", err.Error()) } + // Write series with one point to the database. + now := time.Now().UTC() + fmt.Println("TIME: ", now.UTC().Format(influxql.DateTimeFormat)) + s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: now, Values: map[string]interface{}{"value": float64(20)}}}) + s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: now, Values: map[string]interface{}{"value": float64(30)}}}) + s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west"}, Timestamp: now, Values: map[string]interface{}{"value": float64(100)}}}) + // TODO: figure out how to actually test this - t.Skip("pending") - // fmt.Println("CQ 1") - // s.RunContinuousQueries() - // fmt.Println("CQ 2") - // s.RunContinuousQueries() - // time.Sleep(time.Second * 2) - // fmt.Println("CQ 3") - // s.RunContinuousQueries() - // fmt.Println("CQ 4") - // s.RunContinuousQueries() - // time.Sleep(time.Second * 3) - // fmt.Println("CQ 5") - // s.RunContinuousQueries() - // time.Sleep(time.Second * 3) - // fmt.Println("CQ 6") - // s.RunContinuousQueries() - // time.Sleep(time.Second * 3) - // fmt.Println("CQ 7") - // s.RunContinuousQueries() + // t.Skip("pending") + fmt.Println("CQ 1") + s.RunContinuousQueries() + fmt.Println("CQ 2") + s.RunContinuousQueries() + time.Sleep(time.Second * 2) + // Select data from the server. + results := s.ExecuteQuery(MustParseQuery(`SELECT value, region FROM "foo"."raw".cpu_region GROUP BY region`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 2 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"\"foo\".\"raw\".cpu","tags":{"region":"us-east"},"columns":["time","sum"],"values":[[946684800000000,20],[946684810000000,30]]},{"name":"\"foo\".\"raw\".cpu","tags":{"region":"us-west"},"columns":["time","sum"],"values":[[946684800000000,100]]}]}` { + t.Fatalf("unexpected row(0): %s", s) + } + + fmt.Println("CQ 3") + s.RunContinuousQueries() + fmt.Println("CQ 4") + s.RunContinuousQueries() + time.Sleep(time.Second * 3) + fmt.Println("CQ 5") + s.RunContinuousQueries() + time.Sleep(time.Second * 3) + fmt.Println("CQ 6") + s.RunContinuousQueries() + time.Sleep(time.Second * 3) + fmt.Println("CQ 7") + s.RunContinuousQueries() } func mustMarshalJSON(v interface{}) string { diff --git a/tx.go b/tx.go index fccf93c8bc..6258dd9a04 100644 --- a/tx.go +++ b/tx.go @@ -78,6 +78,7 @@ func (tx *tx) CreateIterators(stmt *influxql.SelectStatement) ([]influxql.Iterat // Grab time range from statement. tmin, tmax := influxql.TimeRange(stmt.Condition) + warn("range: ", tmin.Format(influxql.DateTimeFormat), tmax.Format(influxql.DateTimeFormat)) if tmin.IsZero() { tmin = time.Unix(0, 1) }