WIP: fix cq query times

pull/1285/head
Paul Dix 2015-01-28 00:11:48 -05:00
parent 7269647bfd
commit b0d4b6da55
4 changed files with 65 additions and 26 deletions

View File

@ -719,11 +719,22 @@ func (s *SelectStatement) GroupByInterval() (time.Duration, error) {
// SetTimeRange sets the start and end time of the select statement to [start, end). i.e. start inclusive, end exclusive.
// This is used commonly for continuous queries so the start and end are in buckets.
func (s *SelectStatement) SetTimeRange(start, end time.Time) error {
cond := fmt.Sprintf("time >= %ds AND time < %ds", start.Unix(), end.Unix())
cond := fmt.Sprintf("time >= '%s' AND time < '%s'", start.Format(DateTimeFormat), end.Format(DateTimeFormat))
if s.Condition != nil {
cond = fmt.Sprintf("%s AND %s", s.rewriteWithoutTimeDimensions(), cond)
}
// cond = ""
// var filteredDims Dimensions
// for _, d := range s.Dimensions {
// if call, ok := d.Expr.(*Call); ok && strings.ToLower(call.Name) == "time" {
// // do nothing
// } else {
// filteredDims = append(filteredDims, d)
// }
// }
// s.Dimensions = filteredDims
expr, err := NewParser(strings.NewReader(cond)).ParseExpr()
if err != nil {
return err
@ -745,10 +756,13 @@ func (s *SelectStatement) rewriteWithoutTimeDimensions() string {
return &BooleanLiteral{Val: true}
}
return n
case *Call:
return &BooleanLiteral{Val: true}
default:
return n
}
})
return n.String()
}
@ -1657,11 +1671,15 @@ func timeExprValue(ref Expr, lit Expr) time.Time {
if ref, ok := ref.(*VarRef); ok && strings.ToLower(ref.Val) == "time" {
switch lit := lit.(type) {
case *TimeLiteral:
warn("timeExpr ", lit.Val.String())
return lit.Val
case *DurationLiteral:
return time.Unix(0, int64(lit.Val)).UTC()
default:
warn("timeExpr: ", lit.String())
}
}
warn("timeExpr is nil")
return time.Time{}
}

View File

@ -3034,9 +3034,9 @@ func (s *Server) RunContinuousQueries() error {
for _, d := range s.databases {
for _, c := range d.continuousQueries {
if s.shouldRunContinuousQuery(c) {
// go func(cq *ContinuousQuery) {
s.runContinuousQuery(c)
// }(c)
go func(cq *ContinuousQuery) {
s.runContinuousQuery(c)
}(c)
}
}
}
@ -3126,7 +3126,7 @@ func (s *Server) runContinuousQuery(cq *ContinuousQuery) {
// runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in
func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
log.Printf("cq run: %s\n", cq.cq.String())
log.Printf("cq run: %s %s\n", cq.cq.Database, cq.cq.Source.String())
e, err := s.planSelectStatement(cq.cq.Source, cq.cq.Database)
if err != nil {
@ -3144,6 +3144,7 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
db := ""
retentionPolicy := ""
for row := range ch {
warn("row: ", row)
points, err := s.convertRowToPoints(row)
if err != nil {
log.Println(err)
@ -3158,6 +3159,7 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
}
}
}
warn("cq.run.write")
return nil
}

View File

@ -1173,9 +1173,10 @@ func TestServer_RunContinuousQueries(t *testing.T) {
if err := s.CreateDatabase("foo"); err != nil {
t.Fatal(err)
}
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil {
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw"}); err != nil {
t.Fatal(err)
}
s.SetDefaultRetentionPolicy("foo", "raw")
s.RecomputePreviousN = 2
s.RecomputeNoOlderThan = 4 * time.Second
@ -1183,7 +1184,7 @@ func TestServer_RunContinuousQueries(t *testing.T) {
s.ComputeNoMoreThan = 2 * time.Second
// create and check
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(5s) END"
q := `CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT mean(value) INTO cpu_region FROM "foo"."raw".cpu GROUP BY time(5s), region END`
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
if err != nil {
t.Fatalf("error parsing query %s", err.Error())
@ -1193,26 +1194,43 @@ func TestServer_RunContinuousQueries(t *testing.T) {
t.Fatalf("error creating continuous query %s", err.Error())
}
// Write series with one point to the database.
now := time.Now().UTC()
fmt.Println("TIME: ", now.UTC().Format(influxql.DateTimeFormat))
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: now, Values: map[string]interface{}{"value": float64(20)}}})
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: now, Values: map[string]interface{}{"value": float64(30)}}})
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west"}, Timestamp: now, Values: map[string]interface{}{"value": float64(100)}}})
// TODO: figure out how to actually test this
t.Skip("pending")
// fmt.Println("CQ 1")
// s.RunContinuousQueries()
// fmt.Println("CQ 2")
// s.RunContinuousQueries()
// time.Sleep(time.Second * 2)
// fmt.Println("CQ 3")
// s.RunContinuousQueries()
// fmt.Println("CQ 4")
// s.RunContinuousQueries()
// time.Sleep(time.Second * 3)
// fmt.Println("CQ 5")
// s.RunContinuousQueries()
// time.Sleep(time.Second * 3)
// fmt.Println("CQ 6")
// s.RunContinuousQueries()
// time.Sleep(time.Second * 3)
// fmt.Println("CQ 7")
// s.RunContinuousQueries()
// t.Skip("pending")
fmt.Println("CQ 1")
s.RunContinuousQueries()
fmt.Println("CQ 2")
s.RunContinuousQueries()
time.Sleep(time.Second * 2)
// Select data from the server.
results := s.ExecuteQuery(MustParseQuery(`SELECT value, region FROM "foo"."raw".cpu_region GROUP BY region`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Rows) != 2 {
t.Fatalf("unexpected row count: %d", len(res.Rows))
} else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"\"foo\".\"raw\".cpu","tags":{"region":"us-east"},"columns":["time","sum"],"values":[[946684800000000,20],[946684810000000,30]]},{"name":"\"foo\".\"raw\".cpu","tags":{"region":"us-west"},"columns":["time","sum"],"values":[[946684800000000,100]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
fmt.Println("CQ 3")
s.RunContinuousQueries()
fmt.Println("CQ 4")
s.RunContinuousQueries()
time.Sleep(time.Second * 3)
fmt.Println("CQ 5")
s.RunContinuousQueries()
time.Sleep(time.Second * 3)
fmt.Println("CQ 6")
s.RunContinuousQueries()
time.Sleep(time.Second * 3)
fmt.Println("CQ 7")
s.RunContinuousQueries()
}
func mustMarshalJSON(v interface{}) string {

1
tx.go
View File

@ -78,6 +78,7 @@ func (tx *tx) CreateIterators(stmt *influxql.SelectStatement) ([]influxql.Iterat
// Grab time range from statement.
tmin, tmax := influxql.TimeRange(stmt.Condition)
warn("range: ", tmin.Format(influxql.DateTimeFormat), tmax.Format(influxql.DateTimeFormat))
if tmin.IsZero() {
tmin = time.Unix(0, 1)
}