Finish wiring up basic version of CQs
parent
fec6764b09
commit
8d9bcdbc97
|
@ -1610,7 +1610,6 @@ func TimeRange(expr Expr) (min, max time.Time) {
|
||||||
// Otherwise check for for the right-hand side and flip the operator.
|
// Otherwise check for for the right-hand side and flip the operator.
|
||||||
value, op := timeExprValue(n.LHS, n.RHS), n.Op
|
value, op := timeExprValue(n.LHS, n.RHS), n.Op
|
||||||
if value.IsZero() {
|
if value.IsZero() {
|
||||||
return
|
|
||||||
if value = timeExprValue(n.RHS, n.LHS); value.IsZero() {
|
if value = timeExprValue(n.RHS, n.LHS); value.IsZero() {
|
||||||
return
|
return
|
||||||
} else if op == LT {
|
} else if op == LT {
|
||||||
|
|
27
server.go
27
server.go
|
@ -11,7 +11,6 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"reflect"
|
|
||||||
"regexp"
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -3162,10 +3161,7 @@ func (s *Server) runContinuousQuery(cq *ContinuousQuery) {
|
||||||
|
|
||||||
// runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in
|
// runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in
|
||||||
func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
||||||
warn("> cq run: ", cq.cq.Database, cq.cq.Source.String())
|
|
||||||
|
|
||||||
e, err := s.planSelectStatement(cq.cq.Source, cq.cq.Database)
|
e, err := s.planSelectStatement(cq.cq.Source, cq.cq.Database)
|
||||||
warn("> planned")
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -3178,35 +3174,27 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read all rows from channel and write them in
|
// Read all rows from channel and write them in
|
||||||
// TODO paul: fill in db and retention policy when CQ parsing gets updated
|
|
||||||
warn("cq.start.empty.ch")
|
|
||||||
for row := range ch {
|
for row := range ch {
|
||||||
warn("row: ", row)
|
points, err := s.convertRowToPoints(cq.intoMeasurement, row)
|
||||||
points, err := s.convertRowToPoints(row)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, p := range points {
|
|
||||||
warn("> ", p)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(points) > 0 {
|
if len(points) > 0 {
|
||||||
_, err = s.WriteSeries(cq.intoDB, cq.intoRP, points)
|
_, err = s.WriteSeries(cq.intoDB, cq.intoRP, points)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("cq err: %s", err)
|
log.Printf("[cq] err: %s", err)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
warn("> empty points")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
warn("cq.run.write")
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// convertRowToPoints will convert a query result Row into Points that can be written back in
|
// convertRowToPoints will convert a query result Row into Points that can be written back in.
|
||||||
func (s *Server) convertRowToPoints(row *influxql.Row) ([]Point, error) {
|
// Used for continuous and INTO queries
|
||||||
|
func (s *Server) convertRowToPoints(measurementName string, row *influxql.Row) ([]Point, error) {
|
||||||
// figure out which parts of the result are the time and which are the fields
|
// figure out which parts of the result are the time and which are the fields
|
||||||
timeIndex := -1
|
timeIndex := -1
|
||||||
fieldIndexes := make(map[string]int)
|
fieldIndexes := make(map[string]int)
|
||||||
|
@ -3229,11 +3217,8 @@ func (s *Server) convertRowToPoints(row *influxql.Row) ([]Point, error) {
|
||||||
vals[fieldName] = v[fieldIndex]
|
vals[fieldName] = v[fieldIndex]
|
||||||
}
|
}
|
||||||
|
|
||||||
warn("> ", row)
|
|
||||||
warn("> ", reflect.TypeOf(v[timeIndex]))
|
|
||||||
warn("> ", vals)
|
|
||||||
p := &Point{
|
p := &Point{
|
||||||
Name: row.Name,
|
Name: measurementName,
|
||||||
Tags: row.Tags,
|
Tags: row.Tags,
|
||||||
Timestamp: v[timeIndex].(time.Time),
|
Timestamp: v[timeIndex].(time.Time),
|
||||||
Values: vals,
|
Values: vals,
|
||||||
|
|
|
@ -1179,13 +1179,13 @@ func TestServer_RunContinuousQueries(t *testing.T) {
|
||||||
}
|
}
|
||||||
s.SetDefaultRetentionPolicy("foo", "raw")
|
s.SetDefaultRetentionPolicy("foo", "raw")
|
||||||
|
|
||||||
s.RecomputePreviousN = 2
|
s.RecomputePreviousN = 50
|
||||||
s.RecomputeNoOlderThan = 4 * time.Second
|
s.RecomputeNoOlderThan = time.Second
|
||||||
s.ComputeRunsPerInterval = 5
|
s.ComputeRunsPerInterval = 5
|
||||||
s.ComputeNoMoreThan = 2 * time.Second
|
s.ComputeNoMoreThan = 2 * time.Millisecond
|
||||||
|
|
||||||
// create cq and check
|
// create cq and check
|
||||||
q := `CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT mean(value) INTO cpu_region FROM cpu GROUP BY time(5s), region END`
|
q := `CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT mean(value) INTO cpu_region FROM cpu GROUP BY time(5ms), region END`
|
||||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error parsing query %s", err.Error())
|
t.Fatalf("error parsing query %s", err.Error())
|
||||||
|
@ -1198,50 +1198,49 @@ func TestServer_RunContinuousQueries(t *testing.T) {
|
||||||
t.Fatalf("error running cqs when no data exists: %s", err.Error())
|
t.Fatalf("error running cqs when no data exists: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write series with one point to the database.
|
// set a test time in the middle of a 5 second interval that we can work with
|
||||||
now := time.Now().UTC()
|
testTime := time.Now().UTC().Round(5 * time.Millisecond)
|
||||||
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: now, Values: map[string]interface{}{"value": float64(20)}}})
|
if testTime.UnixNano() > time.Now().UnixNano() {
|
||||||
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: now, Values: map[string]interface{}{"value": float64(30)}}})
|
testTime = testTime.Add(-5 * time.Millisecond)
|
||||||
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west"}, Timestamp: now, Values: map[string]interface{}{"value": float64(100)}}})
|
}
|
||||||
|
testTime.Add(time.Millisecond * 2)
|
||||||
|
|
||||||
start := time.Now().Round(time.Minute * 5).Add(-time.Minute * 5)
|
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: testTime, Values: map[string]interface{}{"value": float64(30)}}})
|
||||||
end := start.Add(time.Minute * 5)
|
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: testTime.Add(-time.Millisecond * 5), Values: map[string]interface{}{"value": float64(20)}}})
|
||||||
cond := fmt.Sprintf("time >= '%s' AND time < '%s'", start.UTC().Format(time.RFC3339Nano), end.UTC().Format(time.RFC3339Nano))
|
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west"}, Timestamp: testTime, Values: map[string]interface{}{"value": float64(100)}}})
|
||||||
q1, _ := influxql.NewParser(strings.NewReader(fmt.Sprintf(`SELECT mean(value) FROM "foo"."raw"."cpu" WHERE %s GROUP BY time(5s), region`, cond))).ParseQuery()
|
|
||||||
fmt.Println("ASDF: ", q1.String())
|
|
||||||
r1 := s.ExecuteQuery(q1, "foo", nil)
|
|
||||||
fmt.Println("RESULTS: ", r1.Results[0])
|
|
||||||
|
|
||||||
time.Sleep(time.Second * 2)
|
// Run CQs after a period of time
|
||||||
// TODO: figure out how to actually test this
|
time.Sleep(time.Millisecond * 50)
|
||||||
fmt.Println("CQ 1")
|
|
||||||
s.RunContinuousQueries()
|
s.RunContinuousQueries()
|
||||||
fmt.Println("CQ 2")
|
// give the CQs time to run
|
||||||
s.RunContinuousQueries()
|
time.Sleep(time.Millisecond * 100)
|
||||||
time.Sleep(time.Second * 2)
|
|
||||||
// Select data from the server.
|
verify := func(num int, exp string) {
|
||||||
results := s.ExecuteQuery(MustParseQuery(`SELECT value, region FROM "foo"."raw".cpu_region GROUP BY region`), "foo", nil)
|
results := s.ExecuteQuery(MustParseQuery(`SELECT mean(mean) FROM cpu_region GROUP BY region`), "foo", nil)
|
||||||
if res := results.Results[0]; res.Err != nil {
|
if res := results.Results[0]; res.Err != nil {
|
||||||
t.Fatalf("unexpected error: %s", res.Err)
|
t.Fatalf("unexpected error verify %d: %s", num, res.Err)
|
||||||
} else if len(res.Rows) != 2 {
|
} else if len(res.Rows) != 2 {
|
||||||
t.Fatalf("unexpected row count: %d", len(res.Rows))
|
t.Fatalf("unexpected row count on verify %d: %d", num, len(res.Rows))
|
||||||
} else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"\"foo\".\"raw\".cpu","tags":{"region":"us-east"},"columns":["time","sum"],"values":[[946684800000000,20],[946684810000000,30]]},{"name":"\"foo\".\"raw\".cpu","tags":{"region":"us-west"},"columns":["time","sum"],"values":[[946684800000000,100]]}]}` {
|
} else if s := mustMarshalJSON(res); s != exp {
|
||||||
t.Fatalf("unexpected row(0): %s", s)
|
t.Fatalf("unexpected row(0) on verify %d: %s", num, s)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("CQ 3")
|
// ensure CQ results were saved
|
||||||
|
verify(1, `{"rows":[{"name":"cpu_region","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",25]]},{"name":"cpu_region","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",100]]}]}`)
|
||||||
|
|
||||||
|
// ensure that repeated runs don't cause duplicate data
|
||||||
s.RunContinuousQueries()
|
s.RunContinuousQueries()
|
||||||
fmt.Println("CQ 4")
|
verify(2, `{"rows":[{"name":"cpu_region","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",25]]},{"name":"cpu_region","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",100]]}]}`)
|
||||||
s.RunContinuousQueries()
|
|
||||||
time.Sleep(time.Second * 3)
|
// ensure that data written into a previous window is picked up and the result recomputed.
|
||||||
fmt.Println("CQ 5")
|
time.Sleep(time.Millisecond * 2)
|
||||||
s.RunContinuousQueries()
|
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west"}, Timestamp: testTime.Add(-time.Millisecond), Values: map[string]interface{}{"value": float64(50)}}})
|
||||||
time.Sleep(time.Second * 3)
|
|
||||||
fmt.Println("CQ 6")
|
|
||||||
s.RunContinuousQueries()
|
|
||||||
time.Sleep(time.Second * 3)
|
|
||||||
fmt.Println("CQ 7")
|
|
||||||
s.RunContinuousQueries()
|
s.RunContinuousQueries()
|
||||||
|
// give CQs time to run
|
||||||
|
time.Sleep(time.Millisecond * 50)
|
||||||
|
|
||||||
|
verify(3, `{"rows":[{"name":"cpu_region","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",25]]},{"name":"cpu_region","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",75]]}]}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustMarshalJSON(v interface{}) string {
|
func mustMarshalJSON(v interface{}) string {
|
||||||
|
|
1
tx.go
1
tx.go
|
@ -78,7 +78,6 @@ func (tx *tx) CreateIterators(stmt *influxql.SelectStatement) ([]influxql.Iterat
|
||||||
|
|
||||||
// Grab time range from statement.
|
// Grab time range from statement.
|
||||||
tmin, tmax := influxql.TimeRange(stmt.Condition)
|
tmin, tmax := influxql.TimeRange(stmt.Condition)
|
||||||
warn("range: ", tmin.Format(influxql.DateTimeFormat), tmax.Format(influxql.DateTimeFormat))
|
|
||||||
if tmin.IsZero() {
|
if tmin.IsZero() {
|
||||||
tmin = time.Unix(0, 1)
|
tmin = time.Unix(0, 1)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue