WIP: wire up continuous queries
* Fix run to work with CQ broker * Fix CQ handler * Fix SetTimeRange to use RFC3339Nano * Fix the time range tests * Fix the parser to parse for RFC3339Nano literals in addition to the other format * Add logic for running CQs * Remove duplicate WriteBufferSize default setting from configpull/1285/head
parent
2d08d6e288
commit
fec6764b09
|
@ -44,7 +44,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
|
|||
// Start the broker handler.
|
||||
var h *Handler
|
||||
if b != nil {
|
||||
h = &Handler{brokerHandler: messaging.NewHandler(b)}
|
||||
h = &Handler{brokerHandler: messaging.NewHandler(b.Broker)}
|
||||
// We want to make sure we are spun up before we exit this function, so we manually listen and serve
|
||||
listener, err := net.Listen("tcp", config.BrokerAddr())
|
||||
if err != nil {
|
||||
|
|
|
@ -329,7 +329,7 @@ func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request) {
|
|||
// serveProcessContinuousQueries will execute any continuous queries that should be run
|
||||
func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, u *influxdb.User) {
|
||||
if err := h.server.RunContinuousQueries(); err != nil {
|
||||
httpError(w, err.Error(), http.StatusInternalServerError)
|
||||
httpError(w, err.Error(), false, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -719,22 +719,11 @@ func (s *SelectStatement) GroupByInterval() (time.Duration, error) {
|
|||
// SetTimeRange sets the start and end time of the select statement to [start, end). i.e. start inclusive, end exclusive.
|
||||
// This is used commonly for continuous queries so the start and end are in buckets.
|
||||
func (s *SelectStatement) SetTimeRange(start, end time.Time) error {
|
||||
cond := fmt.Sprintf("time >= '%s' AND time < '%s'", start.Format(DateTimeFormat), end.Format(DateTimeFormat))
|
||||
cond := fmt.Sprintf("time >= '%s' AND time < '%s'", start.UTC().Format(time.RFC3339Nano), end.UTC().Format(time.RFC3339Nano))
|
||||
if s.Condition != nil {
|
||||
cond = fmt.Sprintf("%s AND %s", s.rewriteWithoutTimeDimensions(), cond)
|
||||
}
|
||||
|
||||
// cond = ""
|
||||
// var filteredDims Dimensions
|
||||
// for _, d := range s.Dimensions {
|
||||
// if call, ok := d.Expr.(*Call); ok && strings.ToLower(call.Name) == "time" {
|
||||
// // do nothing
|
||||
// } else {
|
||||
// filteredDims = append(filteredDims, d)
|
||||
// }
|
||||
// }
|
||||
// s.Dimensions = filteredDims
|
||||
|
||||
expr, err := NewParser(strings.NewReader(cond)).ParseExpr()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -893,6 +882,7 @@ func MatchSource(src Source, name string) string {
|
|||
return ""
|
||||
}
|
||||
|
||||
// TODO pauldix: Target should actually have a Database, RetentionPolicy, and Measurement. These should be set based on the ON part of the query, and the SplitIdent of the INTO name
|
||||
// Target represents a target (destination) policy, measurment, and DB.
|
||||
type Target struct {
|
||||
// Measurement to write into.
|
||||
|
@ -1620,6 +1610,7 @@ func TimeRange(expr Expr) (min, max time.Time) {
|
|||
// Otherwise check for for the right-hand side and flip the operator.
|
||||
value, op := timeExprValue(n.LHS, n.RHS), n.Op
|
||||
if value.IsZero() {
|
||||
return
|
||||
if value = timeExprValue(n.RHS, n.LHS); value.IsZero() {
|
||||
return
|
||||
} else if op == LT {
|
||||
|
@ -1671,15 +1662,11 @@ func timeExprValue(ref Expr, lit Expr) time.Time {
|
|||
if ref, ok := ref.(*VarRef); ok && strings.ToLower(ref.Val) == "time" {
|
||||
switch lit := lit.(type) {
|
||||
case *TimeLiteral:
|
||||
warn("timeExpr ", lit.Val.String())
|
||||
return lit.Val
|
||||
case *DurationLiteral:
|
||||
return time.Unix(0, int64(lit.Val)).UTC()
|
||||
default:
|
||||
warn("timeExpr: ", lit.String())
|
||||
}
|
||||
}
|
||||
warn("timeExpr is nil")
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
|
|
|
@ -278,19 +278,19 @@ func TestSelectStatement_OnlyTimeDimensions(t *testing.T) {
|
|||
exp: false,
|
||||
},
|
||||
{
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05'`,
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05Z'`,
|
||||
exp: true,
|
||||
},
|
||||
{
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05' AND time < '2000-01-01T00:00:05'`,
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:05Z'`,
|
||||
exp: true,
|
||||
},
|
||||
{
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05' AND asdf = 'bar'`,
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05Z' AND asdf = 'bar'`,
|
||||
exp: false,
|
||||
},
|
||||
{
|
||||
stmt: `SELECT value FROM foo WHERE asdf = 'jkl' AND (time >= '2000-01-01T00:00:05' AND time < '2000-01-01T00:00:05')`,
|
||||
stmt: `SELECT value FROM foo WHERE asdf = 'jkl' AND (time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:05Z')`,
|
||||
exp: false,
|
||||
},
|
||||
}
|
||||
|
|
|
@ -319,7 +319,7 @@ loop:
|
|||
for _, row := range rows {
|
||||
for _, values := range row.Values {
|
||||
t := time.Unix(0, values[0].(int64))
|
||||
values[0] = t.UTC().Format(time.RFC3339Nano)
|
||||
values[0] = t.UTC()
|
||||
}
|
||||
a = append(a, row)
|
||||
}
|
||||
|
|
|
@ -1481,7 +1481,12 @@ func (p *Parser) parseUnaryExpr() (Expr, error) {
|
|||
if isDateTimeString(lit) {
|
||||
t, err := time.Parse(DateTimeFormat, lit)
|
||||
if err != nil {
|
||||
return nil, &ParseError{Message: "unable to parse datetime", Pos: pos}
|
||||
// try to parse it as an RFCNano time
|
||||
t, err := time.Parse(time.RFC3339Nano, lit)
|
||||
if err != nil {
|
||||
return nil, &ParseError{Message: "unable to parse datetime", Pos: pos}
|
||||
}
|
||||
return &TimeLiteral{Val: t}, nil
|
||||
}
|
||||
return &TimeLiteral{Val: t}, nil
|
||||
} else if isDateString(lit) {
|
||||
|
@ -1688,7 +1693,7 @@ func isDateString(s string) bool { return dateStringRegexp.MatchString(s) }
|
|||
func isDateTimeString(s string) bool { return dateTimeStringRegexp.MatchString(s) }
|
||||
|
||||
var dateStringRegexp = regexp.MustCompile(`^\d{4}-\d{2}-\d{2}$`)
|
||||
var dateTimeStringRegexp = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d+)?$`)
|
||||
var dateTimeStringRegexp = regexp.MustCompile(`^\d{4}-\d{2}-\d{2}.+`)
|
||||
|
||||
// ErrInvalidDuration is returned when parsing a malformatted duration.
|
||||
var ErrInvalidDuration = errors.New("invalid duration")
|
||||
|
|
88
server.go
88
server.go
|
@ -11,6 +11,7 @@ import (
|
|||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
@ -2037,6 +2038,7 @@ func (s *Server) planSelectStatement(stmt *influxql.SelectStatement) (*influxql.
|
|||
|
||||
// Plan query.
|
||||
p := influxql.NewPlanner(s)
|
||||
|
||||
return p.Plan(stmt)
|
||||
}
|
||||
|
||||
|
@ -2968,9 +2970,12 @@ func HashPassword(password string) ([]byte, error) {
|
|||
type ContinuousQuery struct {
|
||||
Query string `json:"query"`
|
||||
|
||||
mu sync.Mutex
|
||||
cq *influxql.CreateContinuousQueryStatement
|
||||
lastRun time.Time
|
||||
mu sync.Mutex
|
||||
cq *influxql.CreateContinuousQueryStatement
|
||||
lastRun time.Time
|
||||
intoDB string
|
||||
intoRP string
|
||||
intoMeasurement string
|
||||
}
|
||||
|
||||
// NewContinuousQuery returns a ContinuousQuery object with a parsed influxql.CreateContinuousQueryStatement
|
||||
|
@ -2985,15 +2990,36 @@ func NewContinuousQuery(q string) (*ContinuousQuery, error) {
|
|||
return nil, errors.New("query isn't a continuous query")
|
||||
}
|
||||
|
||||
return &ContinuousQuery{
|
||||
cquery := &ContinuousQuery{
|
||||
Query: q,
|
||||
cq: cq,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// set which database and retention policy, and measuremet a CQ is writing into
|
||||
a, err := influxql.SplitIdent(cq.Source.Target.Measurement)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// set the default into database to the same as the from database
|
||||
cquery.intoDB = cq.Database
|
||||
|
||||
if len(a) == 1 { // into only set the measurement name. keep default db and rp
|
||||
cquery.intoMeasurement = a[0]
|
||||
} else if len(a) == 2 { // into set the rp and the measurement
|
||||
cquery.intoRP = a[0]
|
||||
cquery.intoMeasurement = a[1]
|
||||
} else { // into set db, rp, and measurement
|
||||
cquery.intoDB = a[0]
|
||||
cquery.intoRP = a[1]
|
||||
cquery.intoMeasurement = a[2]
|
||||
}
|
||||
|
||||
return cquery, nil
|
||||
}
|
||||
|
||||
// applyCreateContinuousQueryCommand adds the continuous query to the database object and saves it to the metastore
|
||||
func (s *Server) applyCreateContinuousQueryCommand(m *messaging.Message) error {
|
||||
fmt.Println("applyCreateContinuousQueryCommand")
|
||||
var c createContinuousQueryCommand
|
||||
mustUnmarshalJSON(m.Data, &c)
|
||||
|
||||
|
@ -3002,12 +3028,21 @@ func (s *Server) applyCreateContinuousQueryCommand(m *messaging.Message) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// normalize the select statement in the CQ so that it has the database and retention policy inserted
|
||||
if err := s.NormalizeStatement(cq.cq.Source, cq.cq.Database); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// ensure the into database exists
|
||||
if s.databases[cq.intoDB] == nil {
|
||||
return ErrDatabaseNotFound
|
||||
}
|
||||
|
||||
// Retrieve the database.
|
||||
db := s.databases[cq.cq.Database]
|
||||
// TODO: we need to do a check to make sure the INTO database is present.
|
||||
if db == nil {
|
||||
return ErrDatabaseNotFound
|
||||
} else if db.continuousQueryByName(cq.cq.Name) != nil {
|
||||
|
@ -3028,12 +3063,16 @@ func (s *Server) applyCreateContinuousQueryCommand(m *messaging.Message) error {
|
|||
// RunContinuousQueries will run any continuous queries that are due to run and write the
|
||||
// results back into the database
|
||||
func (s *Server) RunContinuousQueries() error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
// s.mu.RLock()
|
||||
// defer s.mu.RUnlock()
|
||||
|
||||
for _, d := range s.databases {
|
||||
for _, c := range d.continuousQueries {
|
||||
if s.shouldRunContinuousQuery(c) {
|
||||
// set the into retention policy based on what is now the default
|
||||
if c.intoRP == "" {
|
||||
c.intoRP = d.defaultRetentionPolicy
|
||||
}
|
||||
go func(cq *ContinuousQuery) {
|
||||
s.runContinuousQuery(c)
|
||||
}(c)
|
||||
|
@ -3048,9 +3087,6 @@ func (s *Server) RunContinuousQueries() error {
|
|||
// lastRunTime of the CQ and the rules for when to run set through the config to determine
|
||||
// if this CQ should be run
|
||||
func (s *Server) shouldRunContinuousQuery(cq *ContinuousQuery) bool {
|
||||
cq.mu.Lock()
|
||||
defer cq.mu.Unlock()
|
||||
|
||||
// if it's not aggregated we don't run it
|
||||
if !cq.cq.Source.Aggregated() {
|
||||
return false
|
||||
|
@ -3126,9 +3162,11 @@ func (s *Server) runContinuousQuery(cq *ContinuousQuery) {
|
|||
|
||||
// runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in
|
||||
func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
||||
log.Printf("cq run: %s %s\n", cq.cq.Database, cq.cq.Source.String())
|
||||
warn("> cq run: ", cq.cq.Database, cq.cq.Source.String())
|
||||
|
||||
e, err := s.planSelectStatement(cq.cq.Source, cq.cq.Database)
|
||||
warn("> planned")
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -3141,8 +3179,7 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
|||
|
||||
// Read all rows from channel and write them in
|
||||
// TODO paul: fill in db and retention policy when CQ parsing gets updated
|
||||
db := ""
|
||||
retentionPolicy := ""
|
||||
warn("cq.start.empty.ch")
|
||||
for row := range ch {
|
||||
warn("row: ", row)
|
||||
points, err := s.convertRowToPoints(row)
|
||||
|
@ -3150,13 +3187,17 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
|||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO corylanou: implement batch writing
|
||||
for _, p := range points {
|
||||
_, err := s.WriteSeries(db, retentionPolicy, []Point{*p})
|
||||
warn("> ", p)
|
||||
}
|
||||
|
||||
if len(points) > 0 {
|
||||
_, err = s.WriteSeries(cq.intoDB, cq.intoRP, points)
|
||||
if err != nil {
|
||||
log.Printf("cq write error: %s on: %s\n", err, p)
|
||||
log.Printf("cq err: %s", err)
|
||||
}
|
||||
} else {
|
||||
warn("> empty points")
|
||||
}
|
||||
}
|
||||
warn("cq.run.write")
|
||||
|
@ -3165,7 +3206,7 @@ func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
|||
}
|
||||
|
||||
// convertRowToPoints will convert a query result Row into Points that can be written back in
|
||||
func (s *Server) convertRowToPoints(row *influxql.Row) ([]*Point, error) {
|
||||
func (s *Server) convertRowToPoints(row *influxql.Row) ([]Point, error) {
|
||||
// figure out which parts of the result are the time and which are the fields
|
||||
timeIndex := -1
|
||||
fieldIndexes := make(map[string]int)
|
||||
|
@ -3181,13 +3222,16 @@ func (s *Server) convertRowToPoints(row *influxql.Row) ([]*Point, error) {
|
|||
return nil, errors.New("cq error finding time index in result")
|
||||
}
|
||||
|
||||
points := make([]*Point, 0, len(row.Values))
|
||||
points := make([]Point, 0, len(row.Values))
|
||||
for _, v := range row.Values {
|
||||
vals := make(map[string]interface{})
|
||||
for fieldName, fieldIndex := range fieldIndexes {
|
||||
vals[fieldName] = v[fieldIndex]
|
||||
}
|
||||
|
||||
warn("> ", row)
|
||||
warn("> ", reflect.TypeOf(v[timeIndex]))
|
||||
warn("> ", vals)
|
||||
p := &Point{
|
||||
Name: row.Name,
|
||||
Tags: row.Tags,
|
||||
|
@ -3195,7 +3239,7 @@ func (s *Server) convertRowToPoints(row *influxql.Row) ([]*Point, error) {
|
|||
Values: vals,
|
||||
}
|
||||
|
||||
points = append(points, p)
|
||||
points = append(points, *p)
|
||||
}
|
||||
|
||||
return points, nil
|
||||
|
|
|
@ -1111,6 +1111,7 @@ func TestServer_CreateContinuousQuery(t *testing.T) {
|
|||
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s.SetDefaultRetentionPolicy("foo", "bar")
|
||||
|
||||
// create and check
|
||||
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(10m) END"
|
||||
|
@ -1126,8 +1127,8 @@ func TestServer_CreateContinuousQuery(t *testing.T) {
|
|||
queries := s.ContinuousQueries("foo")
|
||||
cqObj, _ := influxdb.NewContinuousQuery(q)
|
||||
expected := []*influxdb.ContinuousQuery{cqObj}
|
||||
if !reflect.DeepEqual(queries, expected) {
|
||||
t.Fatalf("query not saved:\n\texp: %s\ngot: %s", mustMarshalJSON(expected), mustMarshalJSON(queries))
|
||||
if mustMarshalJSON(expected) != mustMarshalJSON(queries) {
|
||||
t.Fatalf("query not saved:\n\texp: %s\n\tgot: %s", mustMarshalJSON(expected), mustMarshalJSON(queries))
|
||||
}
|
||||
s.Restart()
|
||||
|
||||
|
@ -1183,8 +1184,8 @@ func TestServer_RunContinuousQueries(t *testing.T) {
|
|||
s.ComputeRunsPerInterval = 5
|
||||
s.ComputeNoMoreThan = 2 * time.Second
|
||||
|
||||
// create and check
|
||||
q := `CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT mean(value) INTO cpu_region FROM "foo"."raw".cpu GROUP BY time(5s), region END`
|
||||
// create cq and check
|
||||
q := `CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT mean(value) INTO cpu_region FROM cpu GROUP BY time(5s), region END`
|
||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||
if err != nil {
|
||||
t.Fatalf("error parsing query %s", err.Error())
|
||||
|
@ -1193,16 +1194,26 @@ func TestServer_RunContinuousQueries(t *testing.T) {
|
|||
if err := s.CreateContinuousQuery(cq); err != nil {
|
||||
t.Fatalf("error creating continuous query %s", err.Error())
|
||||
}
|
||||
if err := s.RunContinuousQueries(); err != nil {
|
||||
t.Fatalf("error running cqs when no data exists: %s", err.Error())
|
||||
}
|
||||
|
||||
// Write series with one point to the database.
|
||||
now := time.Now().UTC()
|
||||
fmt.Println("TIME: ", now.UTC().Format(influxql.DateTimeFormat))
|
||||
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: now, Values: map[string]interface{}{"value": float64(20)}}})
|
||||
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: now, Values: map[string]interface{}{"value": float64(30)}}})
|
||||
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west"}, Timestamp: now, Values: map[string]interface{}{"value": float64(100)}}})
|
||||
|
||||
start := time.Now().Round(time.Minute * 5).Add(-time.Minute * 5)
|
||||
end := start.Add(time.Minute * 5)
|
||||
cond := fmt.Sprintf("time >= '%s' AND time < '%s'", start.UTC().Format(time.RFC3339Nano), end.UTC().Format(time.RFC3339Nano))
|
||||
q1, _ := influxql.NewParser(strings.NewReader(fmt.Sprintf(`SELECT mean(value) FROM "foo"."raw"."cpu" WHERE %s GROUP BY time(5s), region`, cond))).ParseQuery()
|
||||
fmt.Println("ASDF: ", q1.String())
|
||||
r1 := s.ExecuteQuery(q1, "foo", nil)
|
||||
fmt.Println("RESULTS: ", r1.Results[0])
|
||||
|
||||
time.Sleep(time.Second * 2)
|
||||
// TODO: figure out how to actually test this
|
||||
// t.Skip("pending")
|
||||
fmt.Println("CQ 1")
|
||||
s.RunContinuousQueries()
|
||||
fmt.Println("CQ 2")
|
||||
|
|
Loading…
Reference in New Issue