diff --git a/influxql/ast.go b/influxql/ast.go index 7f2d7f9c3b..92dd1041ca 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -1610,7 +1610,6 @@ func TimeRange(expr Expr) (min, max time.Time) { // Otherwise check for for the right-hand side and flip the operator. value, op := timeExprValue(n.LHS, n.RHS), n.Op if value.IsZero() { - return if value = timeExprValue(n.RHS, n.LHS); value.IsZero() { return } else if op == LT { diff --git a/server.go b/server.go index 0f2f3aa4a4..d23d60da85 100644 --- a/server.go +++ b/server.go @@ -11,7 +11,6 @@ import ( "net/url" "os" "path/filepath" - "reflect" "regexp" "sort" "strconv" @@ -3162,10 +3161,7 @@ func (s *Server) runContinuousQuery(cq *ContinuousQuery) { // runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { - warn("> cq run: ", cq.cq.Database, cq.cq.Source.String()) - e, err := s.planSelectStatement(cq.cq.Source, cq.cq.Database) - warn("> planned") if err != nil { return err @@ -3178,35 +3174,27 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { } // Read all rows from channel and write them in - // TODO paul: fill in db and retention policy when CQ parsing gets updated - warn("cq.start.empty.ch") for row := range ch { - warn("row: ", row) - points, err := s.convertRowToPoints(row) + points, err := s.convertRowToPoints(cq.intoMeasurement, row) if err != nil { log.Println(err) continue } - for _, p := range points { - warn("> ", p) - } if len(points) > 0 { _, err = s.WriteSeries(cq.intoDB, cq.intoRP, points) if err != nil { - log.Printf("cq err: %s", err) + log.Printf("[cq] err: %s", err) } - } else { - warn("> empty points") } } - warn("cq.run.write") return nil } -// convertRowToPoints will convert a query result Row into Points that can be written back in -func (s *Server) convertRowToPoints(row *influxql.Row) ([]Point, error) { +// convertRowToPoints will convert a query result Row into Points that can be written back in. +// Used for continuous and INTO queries +func (s *Server) convertRowToPoints(measurementName string, row *influxql.Row) ([]Point, error) { // figure out which parts of the result are the time and which are the fields timeIndex := -1 fieldIndexes := make(map[string]int) @@ -3229,11 +3217,8 @@ func (s *Server) convertRowToPoints(row *influxql.Row) ([]Point, error) { vals[fieldName] = v[fieldIndex] } - warn("> ", row) - warn("> ", reflect.TypeOf(v[timeIndex])) - warn("> ", vals) p := &Point{ - Name: row.Name, + Name: measurementName, Tags: row.Tags, Timestamp: v[timeIndex].(time.Time), Values: vals, diff --git a/server_test.go b/server_test.go index 0ca58f5d63..cc02ff646a 100644 --- a/server_test.go +++ b/server_test.go @@ -1179,13 +1179,13 @@ func TestServer_RunContinuousQueries(t *testing.T) { } s.SetDefaultRetentionPolicy("foo", "raw") - s.RecomputePreviousN = 2 - s.RecomputeNoOlderThan = 4 * time.Second + s.RecomputePreviousN = 50 + s.RecomputeNoOlderThan = time.Second s.ComputeRunsPerInterval = 5 - s.ComputeNoMoreThan = 2 * time.Second + s.ComputeNoMoreThan = 2 * time.Millisecond // create cq and check - q := `CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT mean(value) INTO cpu_region FROM cpu GROUP BY time(5s), region END` + q := `CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT mean(value) INTO cpu_region FROM cpu GROUP BY time(5ms), region END` stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement() if err != nil { t.Fatalf("error parsing query %s", err.Error()) @@ -1198,50 +1198,49 @@ func TestServer_RunContinuousQueries(t *testing.T) { t.Fatalf("error running cqs when no data exists: %s", err.Error()) } - // Write series with one point to the database. - now := time.Now().UTC() - s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: now, Values: map[string]interface{}{"value": float64(20)}}}) - s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: now, Values: map[string]interface{}{"value": float64(30)}}}) - s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west"}, Timestamp: now, Values: map[string]interface{}{"value": float64(100)}}}) + // set a test time in the middle of a 5 second interval that we can work with + testTime := time.Now().UTC().Round(5 * time.Millisecond) + if testTime.UnixNano() > time.Now().UnixNano() { + testTime = testTime.Add(-5 * time.Millisecond) + } + testTime.Add(time.Millisecond * 2) - start := time.Now().Round(time.Minute * 5).Add(-time.Minute * 5) - end := start.Add(time.Minute * 5) - cond := fmt.Sprintf("time >= '%s' AND time < '%s'", start.UTC().Format(time.RFC3339Nano), end.UTC().Format(time.RFC3339Nano)) - q1, _ := influxql.NewParser(strings.NewReader(fmt.Sprintf(`SELECT mean(value) FROM "foo"."raw"."cpu" WHERE %s GROUP BY time(5s), region`, cond))).ParseQuery() - fmt.Println("ASDF: ", q1.String()) - r1 := s.ExecuteQuery(q1, "foo", nil) - fmt.Println("RESULTS: ", r1.Results[0]) + s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: testTime, Values: map[string]interface{}{"value": float64(30)}}}) + s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: testTime.Add(-time.Millisecond * 5), Values: map[string]interface{}{"value": float64(20)}}}) + s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west"}, Timestamp: testTime, Values: map[string]interface{}{"value": float64(100)}}}) - time.Sleep(time.Second * 2) - // TODO: figure out how to actually test this - fmt.Println("CQ 1") + // Run CQs after a period of time + time.Sleep(time.Millisecond * 50) s.RunContinuousQueries() - fmt.Println("CQ 2") - s.RunContinuousQueries() - time.Sleep(time.Second * 2) - // Select data from the server. - results := s.ExecuteQuery(MustParseQuery(`SELECT value, region FROM "foo"."raw".cpu_region GROUP BY region`), "foo", nil) - if res := results.Results[0]; res.Err != nil { - t.Fatalf("unexpected error: %s", res.Err) - } else if len(res.Rows) != 2 { - t.Fatalf("unexpected row count: %d", len(res.Rows)) - } else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"\"foo\".\"raw\".cpu","tags":{"region":"us-east"},"columns":["time","sum"],"values":[[946684800000000,20],[946684810000000,30]]},{"name":"\"foo\".\"raw\".cpu","tags":{"region":"us-west"},"columns":["time","sum"],"values":[[946684800000000,100]]}]}` { - t.Fatalf("unexpected row(0): %s", s) + // give the CQs time to run + time.Sleep(time.Millisecond * 100) + + verify := func(num int, exp string) { + results := s.ExecuteQuery(MustParseQuery(`SELECT mean(mean) FROM cpu_region GROUP BY region`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error verify %d: %s", num, res.Err) + } else if len(res.Rows) != 2 { + t.Fatalf("unexpected row count on verify %d: %d", num, len(res.Rows)) + } else if s := mustMarshalJSON(res); s != exp { + t.Fatalf("unexpected row(0) on verify %d: %s", num, s) + } } - fmt.Println("CQ 3") + // ensure CQ results were saved + verify(1, `{"rows":[{"name":"cpu_region","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",25]]},{"name":"cpu_region","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",100]]}]}`) + + // ensure that repeated runs don't cause duplicate data s.RunContinuousQueries() - fmt.Println("CQ 4") - s.RunContinuousQueries() - time.Sleep(time.Second * 3) - fmt.Println("CQ 5") - s.RunContinuousQueries() - time.Sleep(time.Second * 3) - fmt.Println("CQ 6") - s.RunContinuousQueries() - time.Sleep(time.Second * 3) - fmt.Println("CQ 7") + verify(2, `{"rows":[{"name":"cpu_region","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",25]]},{"name":"cpu_region","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",100]]}]}`) + + // ensure that data written into a previous window is picked up and the result recomputed. + time.Sleep(time.Millisecond * 2) + s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west"}, Timestamp: testTime.Add(-time.Millisecond), Values: map[string]interface{}{"value": float64(50)}}}) s.RunContinuousQueries() + // give CQs time to run + time.Sleep(time.Millisecond * 50) + + verify(3, `{"rows":[{"name":"cpu_region","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",25]]},{"name":"cpu_region","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",75]]}]}`) } func mustMarshalJSON(v interface{}) string { diff --git a/tx.go b/tx.go index 6258dd9a04..fccf93c8bc 100644 --- a/tx.go +++ b/tx.go @@ -78,7 +78,6 @@ func (tx *tx) CreateIterators(stmt *influxql.SelectStatement) ([]influxql.Iterat // Grab time range from statement. tmin, tmax := influxql.TimeRange(stmt.Condition) - warn("range: ", tmin.Format(influxql.DateTimeFormat), tmax.Format(influxql.DateTimeFormat)) if tmin.IsZero() { tmin = time.Unix(0, 1) }