implement continuous queries as regular execs of into queries.

Now that we have into queries, we can implement them as regular
queries that are just run on a timer.
pull/4409/head
Daniel Morsing 2015-10-13 13:49:34 +00:00
parent 62dff895e2
commit 822af73f88
3 changed files with 22 additions and 218 deletions

View File

@ -302,7 +302,6 @@ func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
srv := continuous_querier.NewService(c)
srv.MetaStore = s.MetaStore
srv.QueryExecutor = s.QueryExecutor
srv.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv)
}

View File

@ -11,10 +11,8 @@ import (
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb"
)
@ -48,11 +46,6 @@ type metaStore interface {
Database(name string) (*meta.DatabaseInfo, error)
}
// pointsWriter is an internal interface to make testing easier.
type pointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
}
// RunRequest is a request to run one or more CQs.
type RunRequest struct {
// Now tells the CQ serivce what the current time is.
@ -79,7 +72,6 @@ func (rr *RunRequest) matches(cq *meta.ContinuousQueryInfo) bool {
type Service struct {
MetaStore metaStore
QueryExecutor queryExecutor
PointsWriter pointsWriter
Config *Config
RunInterval time.Duration
// RunCh can be used by clients to signal service to run CQs.
@ -119,7 +111,6 @@ func (s *Service) Open() error {
assert(s.MetaStore != nil, "MetaStore is nil")
assert(s.QueryExecutor != nil, "QueryExecutor is nil")
assert(s.PointsWriter != nil, "PointsWriter is nil")
s.stop = make(chan struct{})
s.wg = &sync.WaitGroup{}
@ -331,104 +322,17 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
if err != nil {
return err
}
// Read all rows from the result channel.
points := make([]models.Point, 0, 100)
for result := range ch {
if result.Err != nil {
return result.Err
}
for _, row := range result.Series {
// Get the measurement name for the result.
measurement := cq.intoMeasurement()
if measurement == "" {
measurement = row.Name
}
// Convert the result row to points.
part, err := s.convertRowToPoints(measurement, row)
if err != nil {
log.Println(err)
continue
}
if len(part) == 0 {
continue
}
// If the points have any nil values, can't write.
// This happens if the CQ is created and running before data is written to the measurement.
for _, p := range part {
fields := p.Fields()
for _, v := range fields {
if v == nil {
return nil
}
}
}
points = append(points, part...)
}
// There is only one statement, so we will only ever receive one result
res, ok := <-ch
if !ok {
panic("result channel was closed")
}
if len(points) == 0 {
return nil
if res.Err != nil {
return res.Err
}
// Create a write request for the points.
req := &cluster.WritePointsRequest{
Database: cq.intoDB(),
RetentionPolicy: cq.intoRP(),
ConsistencyLevel: cluster.ConsistencyLevelAny,
Points: points,
}
// Write the request.
if err := s.PointsWriter.WritePoints(req); err != nil {
s.Logger.Println(err)
return err
}
s.statMap.Add(statPointsWritten, int64(len(points)))
if s.loggingEnabled {
s.Logger.Printf("wrote %d point(s) to %s.%s", len(points), cq.intoDB(), cq.intoRP())
}
return nil
}
// convertRowToPoints will convert a query result Row into Points that can be written back in.
// Used for continuous and INTO queries
func (s *Service) convertRowToPoints(measurementName string, row *models.Row) ([]models.Point, error) {
// figure out which parts of the result are the time and which are the fields
timeIndex := -1
fieldIndexes := make(map[string]int)
for i, c := range row.Columns {
if c == "time" {
timeIndex = i
} else {
fieldIndexes[c] = i
}
}
if timeIndex == -1 {
return nil, errors.New("error finding time index in result")
}
points := make([]models.Point, 0, len(row.Values))
for _, v := range row.Values {
vals := make(map[string]interface{})
for fieldName, fieldIndex := range fieldIndexes {
vals[fieldName] = v[fieldIndex]
}
p := models.NewPoint(measurementName, row.Tags, vals, v[timeIndex].(time.Time))
points = append(points, p)
}
return points, nil
}
// ContinuousQuery is a local wrapper / helper around continuous queries.
type ContinuousQuery struct {
Database string
@ -437,16 +341,8 @@ type ContinuousQuery struct {
q *influxql.SelectStatement
}
func (cq *ContinuousQuery) intoDB() string {
if cq.q.Target.Measurement.Database != "" {
return cq.q.Target.Measurement.Database
}
return cq.Database
}
func (cq *ContinuousQuery) intoRP() string { return cq.q.Target.Measurement.RetentionPolicy }
func (cq *ContinuousQuery) setIntoRP(rp string) { cq.q.Target.Measurement.RetentionPolicy = rp }
func (cq *ContinuousQuery) intoMeasurement() string { return cq.q.Target.Measurement.Name }
func (cq *ContinuousQuery) intoRP() string { return cq.q.Target.Measurement.RetentionPolicy }
func (cq *ContinuousQuery) setIntoRP(rp string) { cq.q.Target.Measurement.RetentionPolicy = rp }
// NewContinuousQuery returns a ContinuousQuery object with a parsed influxql.CreateContinuousQueryStatement
func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*ContinuousQuery, error) {

View File

@ -5,7 +5,6 @@ import (
"fmt"
"io/ioutil"
"log"
"strings"
"sync"
"testing"
"time"
@ -38,95 +37,6 @@ func TestOpenAndClose(t *testing.T) {
}
}
// Test ExecuteContinuousQuery.
func TestExecuteContinuousQuery(t *testing.T) {
s := NewTestService(t)
dbis, _ := s.MetaStore.Databases()
dbi := dbis[0]
cqi := dbi.ContinuousQueries[0]
pointCnt := 100
qe := s.QueryExecutor.(*QueryExecutor)
qe.Results = []*influxql.Result{genResult(1, pointCnt)}
pw := s.PointsWriter.(*PointsWriter)
pw.WritePointsFn = func(p *cluster.WritePointsRequest) error {
if len(p.Points) != pointCnt {
return fmt.Errorf("exp = %d, got = %d", pointCnt, len(p.Points))
}
return nil
}
err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
if err != nil {
t.Error(err)
}
}
// Test ExecuteContinuousQuery when INTO measurements are taken from the FROM clause.
func TestExecuteContinuousQuery_ReferenceSource(t *testing.T) {
s := NewTestService(t)
dbis, _ := s.MetaStore.Databases()
dbi := dbis[2]
cqi := dbi.ContinuousQueries[0]
rowCnt := 2
pointCnt := 1
qe := s.QueryExecutor.(*QueryExecutor)
qe.Results = []*influxql.Result{genResult(rowCnt, pointCnt)}
pw := s.PointsWriter.(*PointsWriter)
pw.WritePointsFn = func(p *cluster.WritePointsRequest) error {
if len(p.Points) != pointCnt*rowCnt {
return fmt.Errorf("exp = %d, got = %d", pointCnt, len(p.Points))
}
exp := "cpu,host=server01 value=0"
got := p.Points[0].String()
if !strings.Contains(got, exp) {
return fmt.Errorf("\n\tExpected ':MEASUREMENT' to be expanded to the measurement name(s) in the FROM regexp.\n\tqry = %s\n\texp = %s\n\tgot = %s\n", cqi.Query, got, exp)
}
exp = "cpu2,host=server01 value=0"
got = p.Points[1].String()
if !strings.Contains(got, exp) {
return fmt.Errorf("\n\tExpected ':MEASUREMENT' to be expanded to the measurement name(s) in the FROM regexp.\n\tqry = %s\n\texp = %s\n\tgot = %s\n", cqi.Query, got, exp)
}
return nil
}
err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
if err != nil {
t.Error(err)
}
}
// Test the service happy path.
func TestContinuousQueryService(t *testing.T) {
s := NewTestService(t)
pointCnt := 100
qe := s.QueryExecutor.(*QueryExecutor)
qe.Results = []*influxql.Result{genResult(1, pointCnt)}
pw := s.PointsWriter.(*PointsWriter)
ch := make(chan int, 10)
defer close(ch)
pw.WritePointsFn = func(p *cluster.WritePointsRequest) error {
ch <- len(p.Points)
return nil
}
s.Open()
if cnt, err := waitInt(ch, time.Second); err != nil {
t.Error(err)
} else if cnt != pointCnt {
t.Errorf("exp = %d, got = %d", pointCnt, cnt)
}
s.Close()
}
// Test Run method.
func TestContinuousQueryService_Run(t *testing.T) {
s := NewTestService(t)
@ -148,7 +58,9 @@ func TestContinuousQueryService_Run(t *testing.T) {
if callCnt >= expectCallCnt {
done <- struct{}{}
}
return nil, nil
dummych := make(chan *influxql.Result, 1)
dummych <- &influxql.Result{}
return dummych, nil
}
s.Open()
@ -280,7 +192,6 @@ func NewTestService(t *testing.T) *Service {
ms := NewMetaStore(t)
s.MetaStore = ms
s.QueryExecutor = NewQueryExecutor(t)
s.PointsWriter = NewPointsWriter(t)
s.RunInterval = time.Millisecond
// Set Logger to write to dev/null so stdout isn't polluted.
@ -406,21 +317,19 @@ func (ms *MetaStore) CreateContinuousQuery(database, name, query string) error {
// QueryExecutor is a mock query executor.
type QueryExecutor struct {
ExecuteQueryFn func(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error)
Results []*influxql.Result
ResultInterval time.Duration
Err error
ErrAfterResult int
StopRespondingAfter int
t *testing.T
ExecuteQueryFn func(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error)
Results []*influxql.Result
ResultInterval time.Duration
Err error
ErrAfterResult int
t *testing.T
}
// NewQueryExecutor returns a *QueryExecutor.
func NewQueryExecutor(t *testing.T) *QueryExecutor {
return &QueryExecutor{
ErrAfterResult: -1,
StopRespondingAfter: -1,
t: t,
ErrAfterResult: -1,
t: t,
}
}
@ -450,15 +359,15 @@ func (qe *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, ch
ch <- &influxql.Result{Err: qe.Err}
close(ch)
return
} else if i == qe.StopRespondingAfter {
qe.t.Log("ExecuteQuery(): StopRespondingAfter")
return
}
ch <- r
n++
time.Sleep(qe.ResultInterval)
}
qe.t.Logf("ExecuteQuery(): all (%d) results sent", n)
if n == 0 {
ch <- &influxql.Result{Err: qe.Err}
}
close(ch)
}()