fix #2733: hook CQs back in
parent
041b31d6b3
commit
092bc3fd2d
|
@ -137,7 +137,7 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Content-Type", "")
|
||||
req.Header.Set("User-Agent", c.userAgent)
|
||||
if c.username != "" {
|
||||
req.SetBasicAuth(c.username, c.password)
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/services/admin"
|
||||
"github.com/influxdb/influxdb/services/collectd"
|
||||
"github.com/influxdb/influxdb/services/continuous_querier"
|
||||
"github.com/influxdb/influxdb/services/graphite"
|
||||
"github.com/influxdb/influxdb/services/httpd"
|
||||
"github.com/influxdb/influxdb/services/opentsdb"
|
||||
|
@ -65,6 +66,7 @@ func NewServer(c *Config) *Server {
|
|||
s.appendOpenTSDBService(c.OpenTSDB)
|
||||
s.appendUDPService(c.UDP)
|
||||
s.appendRetentionPolicyService(c.Retention)
|
||||
s.appendContinuousQueryService(c.ContinuousQuery)
|
||||
for _, g := range c.Graphites {
|
||||
s.appendGraphiteService(g)
|
||||
}
|
||||
|
@ -136,6 +138,13 @@ func (s *Server) appendUDPService(c udp.Config) {
|
|||
return
|
||||
}
|
||||
srv := udp.NewService(c)
|
||||
srv.Server.PointsWriter = s.PointsWriter
|
||||
}
|
||||
|
||||
func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
|
||||
srv := continuous_querier.NewService(c)
|
||||
srv.MetaStore = s.MetaStore
|
||||
srv.QueryExecutor = s.QueryExecutor
|
||||
srv.PointsWriter = s.PointsWriter
|
||||
s.Services = append(s.Services, srv)
|
||||
}
|
||||
|
|
|
@ -1 +1,343 @@
|
|||
package continuous_querier
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
const (
|
||||
// When planning a select statement, passing zero tells it not to chunk results. Only applies to raw queries
|
||||
NoChunkingSize = 0
|
||||
)
|
||||
|
||||
// Service manages continuous query execution.
|
||||
type Service struct {
|
||||
MetaStore *meta.Store
|
||||
QueryExecutor *tsdb.QueryExecutor
|
||||
PointsWriter *cluster.PointsWriter
|
||||
Config *Config
|
||||
RunInterval time.Duration
|
||||
Logger *log.Logger
|
||||
// lastRuns maps CQ name to last time it was run.
|
||||
lastRuns map[string]time.Time
|
||||
stop chan struct{}
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewService returns a new instance of Service.
|
||||
func NewService(c Config) *Service {
|
||||
s := &Service{
|
||||
Config: &c,
|
||||
RunInterval: time.Second,
|
||||
Logger: log.New(os.Stderr, "[continuous_querier] ", log.LstdFlags),
|
||||
lastRuns: map[string]time.Time{},
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Open starts the service.
|
||||
func (s *Service) Open() error {
|
||||
if s.stop != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.MetaStore == nil {
|
||||
panic("MetaStore is nil")
|
||||
} else if s.QueryExecutor == nil {
|
||||
panic("QueryExecutor is nil")
|
||||
} else if s.PointsWriter == nil {
|
||||
panic("PointsWriter is nil")
|
||||
}
|
||||
|
||||
s.stop = make(chan struct{})
|
||||
s.wg = &sync.WaitGroup{}
|
||||
s.wg.Add(1)
|
||||
go s.backgroundLoop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close stops the service.
|
||||
func (s *Service) Close() error {
|
||||
if s.stop == nil {
|
||||
return nil
|
||||
}
|
||||
close(s.stop)
|
||||
s.wg.Wait()
|
||||
s.wg = nil
|
||||
s.stop = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// backgroundLoop runs on a go routine and periodically executes CQs.
|
||||
func (s *Service) backgroundLoop() {
|
||||
defer s.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-s.stop:
|
||||
return
|
||||
case <-time.After(s.RunInterval):
|
||||
if s.MetaStore.IsLeader() {
|
||||
s.runContinuousQueries()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runContinuousQueries gets CQs from the meta store and runs them.
|
||||
func (s *Service) runContinuousQueries() {
|
||||
// Get list of all databases.
|
||||
dbs, err := s.MetaStore.Databases()
|
||||
if err != nil {
|
||||
s.Logger.Println("error getting databases")
|
||||
return
|
||||
}
|
||||
// Loop through all databases executing CQs.
|
||||
for _, db := range dbs {
|
||||
// TODO: distribute across nodes
|
||||
for _, cq := range db.ContinuousQueries {
|
||||
if err := s.ExecuteContinuousQuery(&db, &cq); err != nil {
|
||||
s.Logger.Println(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ExecuteContinuousQuery executes a single CQ.
|
||||
func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo) error {
|
||||
// TODO: re-enable stats
|
||||
//s.stats.Inc("continuousQueryExecuted")
|
||||
|
||||
// Local wrapper / helper.
|
||||
cq, err := NewContinuousQuery(dbi.Name, cqi)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get the last time this CQ was run from the service's cache.
|
||||
cq.LastRun = s.lastRuns[cqi.Name]
|
||||
|
||||
// Set the retention policy to default if it wasn't specified in the query.
|
||||
if cq.intoRP() == "" {
|
||||
cq.setIntoRP(dbi.DefaultRetentionPolicy)
|
||||
}
|
||||
|
||||
// See if this query needs to be run.
|
||||
computeNoMoreThan := time.Duration(s.Config.ComputeNoMoreThan)
|
||||
if !cq.shouldRunContinuousQuery(s.Config.ComputeRunsPerInterval, computeNoMoreThan) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// We're about to run the query so store the time.
|
||||
now := time.Now()
|
||||
cq.LastRun = now
|
||||
s.lastRuns[cqi.Name] = now
|
||||
|
||||
// Get the group by interval.
|
||||
interval, err := cq.q.GroupByInterval()
|
||||
if err != nil || interval == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Calculate and set the time range for the query.
|
||||
startTime := now.Round(interval)
|
||||
if startTime.UnixNano() > now.UnixNano() {
|
||||
startTime = startTime.Add(-interval)
|
||||
}
|
||||
|
||||
if err := cq.q.SetTimeRange(startTime, startTime.Add(interval)); err != nil {
|
||||
s.Logger.Printf("error setting time range: %s\n", err)
|
||||
}
|
||||
|
||||
// Do the actual processing of the query & writing of results.
|
||||
if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
|
||||
s.Logger.Printf("error: %s. running: %s\n", err, cq.q.String())
|
||||
}
|
||||
|
||||
recomputeNoOlderThan := time.Duration(s.Config.RecomputeNoOlderThan)
|
||||
|
||||
for i := 0; i < s.Config.RecomputePreviousN; i++ {
|
||||
// if we're already more time past the previous window than we're going to look back, stop
|
||||
if now.Sub(startTime) > recomputeNoOlderThan {
|
||||
return nil
|
||||
}
|
||||
newStartTime := startTime.Add(-interval)
|
||||
|
||||
if err := cq.q.SetTimeRange(newStartTime, startTime); err != nil {
|
||||
s.Logger.Printf("error setting time range: %s\n", err)
|
||||
}
|
||||
|
||||
if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
|
||||
s.Logger.Printf("error during recompute previous: %s. running: %s\n", err, cq.q.String())
|
||||
}
|
||||
|
||||
startTime = newStartTime
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in
|
||||
func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
||||
// Wrap the CQ's inner SELECT statement in a Query for the QueryExecutor.
|
||||
q := &influxql.Query{
|
||||
Statements: influxql.Statements{cq.q},
|
||||
}
|
||||
|
||||
// Execute the SELECT.
|
||||
ch, err := s.QueryExecutor.ExecuteQuery(q, cq.Database, NoChunkingSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Read all rows from the result channel.
|
||||
for result := range ch {
|
||||
if result.Err != nil {
|
||||
return result.Err
|
||||
}
|
||||
|
||||
for _, row := range result.Series {
|
||||
// Convert the result row to points.
|
||||
points, err := s.convertRowToPoints(cq.intoMeasurement(), row)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(points) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// If the points have any nil values, can't write.
|
||||
// This happens if the CQ is created and running before data is written to the measurement.
|
||||
for _, p := range points {
|
||||
fields := p.Fields()
|
||||
for _, v := range fields {
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create a write request for the points.
|
||||
req := &cluster.WritePointsRequest{
|
||||
Database: cq.intoDB(),
|
||||
RetentionPolicy: cq.intoRP(),
|
||||
ConsistencyLevel: cluster.ConsistencyLevelAny,
|
||||
Points: points,
|
||||
}
|
||||
|
||||
// Write the request.
|
||||
if err := s.PointsWriter.WritePoints(req); err != nil {
|
||||
s.Logger.Println(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// convertRowToPoints will convert a query result Row into Points that can be written back in.
|
||||
// Used for continuous and INTO queries
|
||||
func (s *Service) convertRowToPoints(measurementName string, row *influxql.Row) ([]tsdb.Point, error) {
|
||||
// figure out which parts of the result are the time and which are the fields
|
||||
timeIndex := -1
|
||||
fieldIndexes := make(map[string]int)
|
||||
for i, c := range row.Columns {
|
||||
if c == "time" {
|
||||
timeIndex = i
|
||||
} else {
|
||||
fieldIndexes[c] = i
|
||||
}
|
||||
}
|
||||
|
||||
if timeIndex == -1 {
|
||||
return nil, errors.New("error finding time index in result")
|
||||
}
|
||||
|
||||
points := make([]tsdb.Point, 0, len(row.Values))
|
||||
for _, v := range row.Values {
|
||||
vals := make(map[string]interface{})
|
||||
for fieldName, fieldIndex := range fieldIndexes {
|
||||
vals[fieldName] = v[fieldIndex]
|
||||
}
|
||||
|
||||
p := tsdb.NewPoint(measurementName, row.Tags, vals, v[timeIndex].(time.Time))
|
||||
|
||||
points = append(points, p)
|
||||
}
|
||||
|
||||
return points, nil
|
||||
}
|
||||
|
||||
// ContinuousQuery is a local wrapper / helper around continuous queries.
|
||||
type ContinuousQuery struct {
|
||||
Database string
|
||||
Info *meta.ContinuousQueryInfo
|
||||
LastRun time.Time
|
||||
q *influxql.SelectStatement
|
||||
}
|
||||
|
||||
func (cq *ContinuousQuery) intoDB() string { return cq.q.Target.Measurement.Database }
|
||||
func (cq *ContinuousQuery) intoRP() string { return cq.q.Target.Measurement.RetentionPolicy }
|
||||
func (cq *ContinuousQuery) setIntoRP(rp string) { cq.q.Target.Measurement.RetentionPolicy = rp }
|
||||
func (cq *ContinuousQuery) intoMeasurement() string { return cq.q.Target.Measurement.Name }
|
||||
|
||||
// NewContinuousQuery returns a ContinuousQuery object with a parsed influxql.CreateContinuousQueryStatement
|
||||
func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*ContinuousQuery, error) {
|
||||
stmt, err := influxql.NewParser(strings.NewReader(cqi.Query)).ParseStatement()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
q, ok := stmt.(*influxql.SelectStatement)
|
||||
if !ok {
|
||||
return nil, errors.New("query isn't a valid continuous query")
|
||||
}
|
||||
|
||||
cquery := &ContinuousQuery{
|
||||
Database: database,
|
||||
Info: cqi,
|
||||
q: q,
|
||||
}
|
||||
|
||||
return cquery, nil
|
||||
}
|
||||
|
||||
// shouldRunContinuousQuery returns true if the CQ should be schedule to run. It will use the
|
||||
// lastRunTime of the CQ and the rules for when to run set through the config to determine
|
||||
// if this CQ should be run
|
||||
func (cq *ContinuousQuery) shouldRunContinuousQuery(runsPerInterval int, noMoreThan time.Duration) bool {
|
||||
// if it's not aggregated we don't run it
|
||||
if cq.q.IsRawQuery {
|
||||
return false
|
||||
}
|
||||
|
||||
// since it's aggregated we need to figure how often it should be run
|
||||
interval, err := cq.q.GroupByInterval()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// determine how often we should run this continuous query.
|
||||
// group by time / the number of times to compute
|
||||
computeEvery := time.Duration(interval.Nanoseconds()/int64(runsPerInterval)) * time.Nanosecond
|
||||
// make sure we're running no more frequently than the setting in the config
|
||||
if computeEvery < noMoreThan {
|
||||
computeEvery = noMoreThan
|
||||
}
|
||||
|
||||
// if we've passed the amount of time since the last run, do it up
|
||||
if cq.LastRun.Add(computeEvery).UnixNano() <= time.Now().UnixNano() {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue