2015-06-05 06:00:54 +00:00
package continuous_querier
import (
"errors"
"fmt"
2016-12-01 18:26:23 +00:00
"os"
2015-06-06 00:23:44 +00:00
"sync"
2015-06-05 06:00:54 +00:00
"testing"
"time"
2016-02-10 17:26:18 +00:00
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/services/meta"
2017-02-17 23:17:22 +00:00
"github.com/uber-go/zap"
2015-06-05 06:00:54 +00:00
)
var (
2015-11-22 19:23:56 +00:00
errExpected = errors . New ( "expected error" )
errUnexpected = errors . New ( "unexpected error" )
2015-06-05 06:00:54 +00:00
)
// Test closing never opened, open, open already open, close, and close already closed.
func TestOpenAndClose ( t * testing . T ) {
2015-06-05 18:40:22 +00:00
s := NewTestService ( t )
2015-06-05 06:00:54 +00:00
if err := s . Close ( ) ; err != nil {
t . Error ( err )
} else if err = s . Open ( ) ; err != nil {
t . Error ( err )
} else if err = s . Open ( ) ; err != nil {
t . Error ( err )
} else if err = s . Close ( ) ; err != nil {
t . Error ( err )
} else if err = s . Close ( ) ; err != nil {
t . Error ( err )
}
}
2015-06-05 23:19:44 +00:00
// Test Run method.
2015-08-28 15:43:35 +00:00
func TestContinuousQueryService_Run ( t * testing . T ) {
2015-06-05 23:19:44 +00:00
s := NewTestService ( t )
// Set RunInterval high so we can trigger using Run method.
s . RunInterval = 10 * time . Minute
done := make ( chan struct { } )
2015-08-27 23:23:21 +00:00
expectCallCnt := 3
2015-06-05 23:19:44 +00:00
callCnt := 0
2016-03-31 22:12:29 +00:00
// Set a callback for ExecuteStatement.
s . QueryExecutor . StatementExecutor = & StatementExecutor {
2016-06-07 17:28:33 +00:00
ExecuteStatementFn : func ( stmt influxql . Statement , ctx influxql . ExecutionContext ) error {
2016-03-31 22:12:29 +00:00
callCnt ++
if callCnt >= expectCallCnt {
done <- struct { } { }
}
ctx . Results <- & influxql . Result { }
return nil
} ,
2015-06-05 23:19:44 +00:00
}
2015-12-18 20:32:05 +00:00
// Use a custom "now" time since the internals of last run care about
// what the actual time is. Truncate to 10 minutes we are starting on an interval.
now := time . Now ( ) . Truncate ( 10 * time . Minute )
2015-06-05 23:19:44 +00:00
s . Open ( )
// Trigger service to run all CQs.
2015-12-18 20:32:05 +00:00
s . Run ( "" , "" , now )
2015-06-05 23:19:44 +00:00
// Shouldn't time out.
if err := wait ( done , 100 * time . Millisecond ) ; err != nil {
t . Error ( err )
}
// This time it should timeout because ExecuteQuery should not get called again.
if err := wait ( done , 100 * time . Millisecond ) ; err == nil {
t . Error ( "too many queries executed" )
}
s . Close ( )
// Now test just one query.
expectCallCnt = 1
callCnt = 0
s . Open ( )
2015-12-18 20:32:05 +00:00
s . Run ( "db" , "cq" , now )
2015-06-05 23:19:44 +00:00
// Shouldn't time out.
if err := wait ( done , 100 * time . Millisecond ) ; err != nil {
t . Error ( err )
}
// This time it should timeout because ExecuteQuery should not get called again.
if err := wait ( done , 100 * time . Millisecond ) ; err == nil {
t . Error ( "too many queries executed" )
}
s . Close ( )
}
2015-12-18 20:32:05 +00:00
func TestContinuousQueryService_ResampleOptions ( t * testing . T ) {
s := NewTestService ( t )
2016-01-20 19:53:53 +00:00
mc := NewMetaClient ( t )
mc . CreateDatabase ( "db" , "" )
mc . CreateContinuousQuery ( "db" , "cq" , ` CREATE CONTINUOUS QUERY cq ON db RESAMPLE EVERY 10s FOR 2m BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(1m) END ` )
s . MetaClient = mc
2015-12-18 20:32:05 +00:00
2016-04-30 22:04:38 +00:00
db := s . MetaClient . Database ( "db" )
2015-12-18 20:32:05 +00:00
cq , err := NewContinuousQuery ( db . Name , & db . ContinuousQueries [ 0 ] )
if err != nil {
t . Fatal ( err )
} else if cq . Resample . Every != 10 * time . Second {
t . Errorf ( "expected resample every to be 10s, got %s" , influxql . FormatDuration ( cq . Resample . Every ) )
} else if cq . Resample . For != 2 * time . Minute {
t . Errorf ( "expected resample for 2m, got %s" , influxql . FormatDuration ( cq . Resample . For ) )
}
// Set RunInterval high so we can trigger using Run method.
s . RunInterval = 10 * time . Minute
done := make ( chan struct { } )
2016-07-14 16:11:03 +00:00
var expected struct {
min time . Time
max time . Time
}
2015-12-18 20:32:05 +00:00
2016-03-31 22:12:29 +00:00
// Set a callback for ExecuteStatement.
s . QueryExecutor . StatementExecutor = & StatementExecutor {
2016-06-07 17:28:33 +00:00
ExecuteStatementFn : func ( stmt influxql . Statement , ctx influxql . ExecutionContext ) error {
2016-07-14 16:11:03 +00:00
s := stmt . ( * influxql . SelectStatement )
min , max , err := influxql . TimeRange ( s . Condition )
if err != nil {
t . Errorf ( "unexpected error parsing time range: %s" , err )
} else if ! expected . min . Equal ( min ) || ! expected . max . Equal ( max ) {
t . Errorf ( "mismatched time range: got=(%s, %s) exp=(%s, %s)" , min , max , expected . min , expected . max )
2016-03-31 22:12:29 +00:00
}
2016-07-14 16:11:03 +00:00
done <- struct { } { }
2016-03-31 22:12:29 +00:00
ctx . Results <- & influxql . Result { }
return nil
} ,
2015-12-18 20:32:05 +00:00
}
s . Open ( )
defer s . Close ( )
// Set the 'now' time to the start of a 10 minute interval. Then trigger a run.
// This should trigger two queries (one for the current time interval, one for the previous).
2016-07-14 16:11:03 +00:00
now := time . Now ( ) . UTC ( ) . Truncate ( 10 * time . Minute )
expected . min = now . Add ( - 2 * time . Minute )
expected . max = now . Add ( - 1 )
2015-12-18 20:32:05 +00:00
s . RunCh <- & RunRequest { Now : now }
if err := wait ( done , 100 * time . Millisecond ) ; err != nil {
t . Fatal ( err )
}
// Trigger another run 10 seconds later. Another two queries should happen,
// but it will be a different two queries.
2016-07-14 16:11:03 +00:00
expected . min = expected . min . Add ( time . Minute )
expected . max = expected . max . Add ( time . Minute )
2015-12-18 20:32:05 +00:00
s . RunCh <- & RunRequest { Now : now . Add ( 10 * time . Second ) }
if err := wait ( done , 100 * time . Millisecond ) ; err != nil {
t . Fatal ( err )
}
// Reset the time period and send the initial request at 5 seconds after the
// 10 minute mark. There should be exactly one call since the current interval is too
// young and only one interval matches the FOR duration.
2016-07-14 16:11:03 +00:00
expected . min = now . Add ( - time . Minute )
expected . max = now . Add ( - 1 )
2015-12-18 20:32:05 +00:00
s . Run ( "" , "" , now . Add ( 5 * time . Second ) )
if err := wait ( done , 100 * time . Millisecond ) ; err != nil {
t . Fatal ( err )
}
2016-07-14 16:11:03 +00:00
// Send a message 10 minutes later and ensure that the system plays catchup.
expected . max = now . Add ( 10 * time . Minute - 1 )
s . RunCh <- & RunRequest { Now : now . Add ( 10 * time . Minute ) }
if err := wait ( done , 100 * time . Millisecond ) ; err != nil {
t . Fatal ( err )
}
2015-12-18 20:32:05 +00:00
// No overflow should be sent.
if err := wait ( done , 100 * time . Millisecond ) ; err == nil {
t . Error ( "too many queries executed" )
}
}
2016-01-22 14:43:46 +00:00
func TestContinuousQueryService_EveryHigherThanInterval ( t * testing . T ) {
s := NewTestService ( t )
2016-01-25 05:00:51 +00:00
ms := NewMetaClient ( t )
2016-01-22 14:43:46 +00:00
ms . CreateDatabase ( "db" , "" )
ms . CreateContinuousQuery ( "db" , "cq" , ` CREATE CONTINUOUS QUERY cq ON db RESAMPLE EVERY 1m BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(30s) END ` )
2016-01-25 05:00:51 +00:00
s . MetaClient = ms
2016-01-22 14:43:46 +00:00
// Set RunInterval high so we can trigger using Run method.
s . RunInterval = 10 * time . Minute
done := make ( chan struct { } )
2016-07-14 16:11:03 +00:00
var expected struct {
min time . Time
max time . Time
}
2016-01-22 14:43:46 +00:00
// Set a callback for ExecuteQuery.
2016-03-31 22:12:29 +00:00
s . QueryExecutor . StatementExecutor = & StatementExecutor {
2016-06-07 17:28:33 +00:00
ExecuteStatementFn : func ( stmt influxql . Statement , ctx influxql . ExecutionContext ) error {
2016-07-14 16:11:03 +00:00
s := stmt . ( * influxql . SelectStatement )
min , max , err := influxql . TimeRange ( s . Condition )
if err != nil {
t . Errorf ( "unexpected error parsing time range: %s" , err )
} else if ! expected . min . Equal ( min ) || ! expected . max . Equal ( max ) {
t . Errorf ( "mismatched time range: got=(%s, %s) exp=(%s, %s)" , min , max , expected . min , expected . max )
2016-03-31 22:12:29 +00:00
}
2016-07-14 16:11:03 +00:00
done <- struct { } { }
2016-03-31 22:12:29 +00:00
ctx . Results <- & influxql . Result { }
return nil
} ,
2016-01-22 14:43:46 +00:00
}
s . Open ( )
defer s . Close ( )
// Set the 'now' time to the start of a 10 minute interval. Then trigger a run.
// This should trigger two queries (one for the current time interval, one for the previous)
// since the default FOR interval should be EVERY, not the GROUP BY interval.
now := time . Now ( ) . Truncate ( 10 * time . Minute )
2016-07-14 16:11:03 +00:00
expected . min = now . Add ( - time . Minute )
expected . max = now . Add ( - 1 )
2016-01-22 14:43:46 +00:00
s . RunCh <- & RunRequest { Now : now }
if err := wait ( done , 100 * time . Millisecond ) ; err != nil {
t . Fatal ( err )
}
// Trigger 30 seconds later. Nothing should run.
s . RunCh <- & RunRequest { Now : now . Add ( 30 * time . Second ) }
if err := wait ( done , 100 * time . Millisecond ) ; err == nil {
t . Fatal ( "too many queries" )
}
// Run again 1 minute later. Another two queries should run.
2016-07-14 16:11:03 +00:00
expected . min = now
expected . max = now . Add ( time . Minute - 1 )
2016-01-22 14:43:46 +00:00
s . RunCh <- & RunRequest { Now : now . Add ( time . Minute ) }
if err := wait ( done , 100 * time . Millisecond ) ; err != nil {
t . Fatal ( err )
}
// No overflow should be sent.
if err := wait ( done , 100 * time . Millisecond ) ; err == nil {
t . Error ( "too many queries executed" )
}
}
2016-07-19 03:47:43 +00:00
func TestContinuousQueryService_GroupByOffset ( t * testing . T ) {
s := NewTestService ( t )
mc := NewMetaClient ( t )
mc . CreateDatabase ( "db" , "" )
mc . CreateContinuousQuery ( "db" , "cq" , ` CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(1m, 30s) END ` )
s . MetaClient = mc
// Set RunInterval high so we can trigger using Run method.
s . RunInterval = 10 * time . Minute
done := make ( chan struct { } )
var expected struct {
min time . Time
max time . Time
}
// Set a callback for ExecuteStatement.
s . QueryExecutor . StatementExecutor = & StatementExecutor {
ExecuteStatementFn : func ( stmt influxql . Statement , ctx influxql . ExecutionContext ) error {
s := stmt . ( * influxql . SelectStatement )
min , max , err := influxql . TimeRange ( s . Condition )
if err != nil {
t . Errorf ( "unexpected error parsing time range: %s" , err )
} else if ! expected . min . Equal ( min ) || ! expected . max . Equal ( max ) {
t . Errorf ( "mismatched time range: got=(%s, %s) exp=(%s, %s)" , min , max , expected . min , expected . max )
}
done <- struct { } { }
ctx . Results <- & influxql . Result { }
return nil
} ,
}
s . Open ( )
defer s . Close ( )
// Set the 'now' time to the start of a 10 minute interval with a 30 second offset.
// Then trigger a run. This should trigger two queries (one for the current time
// interval, one for the previous).
now := time . Now ( ) . UTC ( ) . Truncate ( 10 * time . Minute ) . Add ( 30 * time . Second )
expected . min = now . Add ( - time . Minute )
expected . max = now . Add ( - 1 )
s . RunCh <- & RunRequest { Now : now }
if err := wait ( done , 100 * time . Millisecond ) ; err != nil {
t . Fatal ( err )
}
}
2015-06-05 18:40:22 +00:00
// Test service when not the cluster leader (CQs shouldn't run).
2015-08-28 15:43:35 +00:00
func TestContinuousQueryService_NotLeader ( t * testing . T ) {
2015-06-05 18:40:22 +00:00
s := NewTestService ( t )
// Set RunInterval high so we can test triggering with the RunCh below.
s . RunInterval = 10 * time . Second
2016-01-20 00:04:42 +00:00
s . MetaClient . ( * MetaClient ) . Leader = false
2015-06-05 18:40:22 +00:00
done := make ( chan struct { } )
2016-03-31 22:12:29 +00:00
// Set a callback for ExecuteStatement. Shouldn't get called because we're not the leader.
s . QueryExecutor . StatementExecutor = & StatementExecutor {
2016-06-07 17:28:33 +00:00
ExecuteStatementFn : func ( stmt influxql . Statement , ctx influxql . ExecutionContext ) error {
2016-03-31 22:12:29 +00:00
done <- struct { } { }
ctx . Results <- & influxql . Result { Err : errUnexpected }
return nil
} ,
2015-06-05 18:40:22 +00:00
}
s . Open ( )
// Trigger service to run CQs.
2015-09-02 17:28:03 +00:00
s . RunCh <- & RunRequest { Now : time . Now ( ) }
2015-06-05 18:40:22 +00:00
// Expect timeout error because ExecuteQuery callback wasn't called.
if err := wait ( done , 100 * time . Millisecond ) ; err == nil {
t . Error ( err )
}
2015-06-05 06:00:54 +00:00
s . Close ( )
}
// Test ExecuteContinuousQuery with invalid queries.
func TestExecuteContinuousQuery_InvalidQueries ( t * testing . T ) {
2015-06-05 18:40:22 +00:00
s := NewTestService ( t )
2016-03-31 22:12:29 +00:00
s . QueryExecutor . StatementExecutor = & StatementExecutor {
2016-06-07 17:28:33 +00:00
ExecuteStatementFn : func ( stmt influxql . Statement , ctx influxql . ExecutionContext ) error {
2016-03-31 22:12:29 +00:00
return errUnexpected
} ,
}
2016-04-30 22:04:38 +00:00
dbis := s . MetaClient . Databases ( )
2015-06-05 06:00:54 +00:00
dbi := dbis [ 0 ]
cqi := dbi . ContinuousQueries [ 0 ]
cqi . Query = ` this is not a query `
2017-03-06 22:06:51 +00:00
if _ , err := s . ExecuteContinuousQuery ( & dbi , & cqi , time . Now ( ) ) ; err == nil {
2015-06-05 06:00:54 +00:00
t . Error ( "expected error but got nil" )
}
// Valid query but invalid continuous query.
cqi . Query = ` SELECT * FROM cpu `
2017-03-06 22:06:51 +00:00
if _ , err := s . ExecuteContinuousQuery ( & dbi , & cqi , time . Now ( ) ) ; err == nil {
2015-06-05 06:00:54 +00:00
t . Error ( "expected error but got nil" )
}
// Group by requires aggregate.
2015-06-05 18:40:22 +00:00
cqi . Query = ` SELECT value INTO other_value FROM cpu WHERE time > now() - 1h GROUP BY time(1s) `
2017-03-06 22:06:51 +00:00
if _ , err := s . ExecuteContinuousQuery ( & dbi , & cqi , time . Now ( ) ) ; err == nil {
2015-06-05 06:00:54 +00:00
t . Error ( "expected error but got nil" )
}
}
// Test ExecuteContinuousQuery when QueryExecutor returns an error.
func TestExecuteContinuousQuery_QueryExecutor_Error ( t * testing . T ) {
2015-06-05 18:40:22 +00:00
s := NewTestService ( t )
2016-03-31 22:12:29 +00:00
s . QueryExecutor . StatementExecutor = & StatementExecutor {
2016-06-07 17:28:33 +00:00
ExecuteStatementFn : func ( stmt influxql . Statement , ctx influxql . ExecutionContext ) error {
2016-03-31 22:12:29 +00:00
return errExpected
} ,
}
2015-06-05 06:00:54 +00:00
2016-04-30 22:04:38 +00:00
dbis := s . MetaClient . Databases ( )
2015-06-05 06:00:54 +00:00
dbi := dbis [ 0 ]
cqi := dbi . ContinuousQueries [ 0 ]
2015-12-18 20:32:05 +00:00
now := time . Now ( ) . Truncate ( 10 * time . Minute )
2017-03-06 22:06:51 +00:00
if _ , err := s . ExecuteContinuousQuery ( & dbi , & cqi , now ) ; err != errExpected {
2015-11-22 19:23:56 +00:00
t . Errorf ( "exp = %s, got = %v" , errExpected , err )
2015-06-05 06:00:54 +00:00
}
}
// NewTestService returns a new *Service with default mock object members.
2015-06-05 18:40:22 +00:00
func NewTestService ( t * testing . T ) * Service {
2015-06-05 06:00:54 +00:00
s := NewService ( NewConfig ( ) )
2016-01-20 00:04:42 +00:00
ms := NewMetaClient ( t )
s . MetaClient = ms
2016-03-31 22:12:29 +00:00
s . QueryExecutor = influxql . NewQueryExecutor ( )
2015-06-05 18:40:22 +00:00
s . RunInterval = time . Millisecond
2015-06-05 06:00:54 +00:00
// Set Logger to write to dev/null so stdout isn't polluted.
2016-12-01 18:26:23 +00:00
if testing . Verbose ( ) {
s . WithLogger ( zap . New (
zap . NewTextEncoder ( ) ,
zap . Output ( os . Stderr ) ,
) )
2015-06-15 13:36:43 +00:00
}
2015-06-05 06:00:54 +00:00
2015-06-05 23:19:44 +00:00
// Add a couple test databases and CQs.
ms . CreateDatabase ( "db" , "rp" )
2015-06-15 13:36:43 +00:00
ms . CreateContinuousQuery ( "db" , "cq" , ` CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1h GROUP BY time(1s) END ` )
2015-06-05 23:19:44 +00:00
ms . CreateDatabase ( "db2" , "default" )
2015-06-15 13:36:43 +00:00
ms . CreateContinuousQuery ( "db2" , "cq2" , ` CREATE CONTINUOUS QUERY cq2 ON db2 BEGIN SELECT mean(value) INTO cpu_mean FROM cpu WHERE time > now() - 10m GROUP BY time(1m) END ` )
2015-08-27 23:23:21 +00:00
ms . CreateDatabase ( "db3" , "default" )
ms . CreateContinuousQuery ( "db3" , "cq3" , ` CREATE CONTINUOUS QUERY cq3 ON db3 BEGIN SELECT mean(value) INTO "1hAverages".:MEASUREMENT FROM /cpu[0-9]?/ GROUP BY time(10s) END ` )
2015-06-05 23:19:44 +00:00
2015-06-05 06:00:54 +00:00
return s
}
2016-01-20 00:04:42 +00:00
// MetaClient is a mock meta store.
type MetaClient struct {
2015-06-06 00:23:44 +00:00
mu sync . RWMutex
2015-06-05 06:00:54 +00:00
Leader bool
2016-01-20 23:36:51 +00:00
AllowLease bool
2015-06-05 06:00:54 +00:00
DatabaseInfos [ ] meta . DatabaseInfo
Err error
2015-06-05 18:40:22 +00:00
t * testing . T
2016-01-20 23:36:51 +00:00
nodeID uint64
2015-06-05 06:00:54 +00:00
}
2016-01-20 00:04:42 +00:00
// NewMetaClient returns a *MetaClient.
func NewMetaClient ( t * testing . T ) * MetaClient {
return & MetaClient {
2016-01-20 23:36:51 +00:00
Leader : true ,
AllowLease : true ,
t : t ,
nodeID : 1 ,
2015-06-05 06:00:54 +00:00
}
}
2016-01-20 23:36:51 +00:00
// NodeID returns the client's node ID.
func ( ms * MetaClient ) NodeID ( ) uint64 { return ms . nodeID }
// AcquireLease attempts to acquire the specified lease.
func ( ms * MetaClient ) AcquireLease ( name string ) ( l * meta . Lease , err error ) {
if ms . Leader {
if ms . AllowLease {
return & meta . Lease { Name : name } , nil
}
return nil , errors . New ( "another node owns the lease" )
}
return nil , meta . ErrServiceUnavailable
2015-06-06 00:23:44 +00:00
}
2015-06-05 06:00:54 +00:00
2016-05-11 16:32:56 +00:00
// Databases returns a list of database info about each database in the coordinator.
2016-04-30 22:04:38 +00:00
func ( ms * MetaClient ) Databases ( ) [ ] meta . DatabaseInfo {
2015-06-06 00:23:44 +00:00
ms . mu . RLock ( )
2015-06-06 02:30:57 +00:00
defer ms . mu . RUnlock ( )
2016-04-30 22:04:38 +00:00
return ms . DatabaseInfos
2015-06-06 00:23:44 +00:00
}
2015-06-05 06:00:54 +00:00
2015-06-05 23:19:44 +00:00
// Database returns a single database by name.
2016-04-30 22:04:38 +00:00
func ( ms * MetaClient ) Database ( name string ) * meta . DatabaseInfo {
2015-06-06 00:23:44 +00:00
ms . mu . RLock ( )
defer ms . mu . RUnlock ( )
return ms . database ( name )
}
2016-04-30 22:04:38 +00:00
func ( ms * MetaClient ) database ( name string ) * meta . DatabaseInfo {
2015-06-05 23:19:44 +00:00
if ms . Err != nil {
2016-04-30 22:04:38 +00:00
return nil
2015-06-05 23:19:44 +00:00
}
for i := range ms . DatabaseInfos {
if ms . DatabaseInfos [ i ] . Name == name {
2016-04-30 22:04:38 +00:00
return & ms . DatabaseInfos [ i ]
2015-06-05 23:19:44 +00:00
}
}
2016-04-30 22:04:38 +00:00
return nil
2015-06-05 23:19:44 +00:00
}
// CreateDatabase adds a new database to the meta store.
2016-01-20 00:04:42 +00:00
func ( ms * MetaClient ) CreateDatabase ( name , defaultRetentionPolicy string ) error {
2015-06-06 00:23:44 +00:00
ms . mu . Lock ( )
defer ms . mu . Unlock ( )
2015-06-05 23:19:44 +00:00
if ms . Err != nil {
return ms . Err
}
// See if the database already exists.
for _ , dbi := range ms . DatabaseInfos {
if dbi . Name == name {
return fmt . Errorf ( "database already exists: %s" , name )
}
}
// Create database.
ms . DatabaseInfos = append ( ms . DatabaseInfos , meta . DatabaseInfo {
Name : name ,
DefaultRetentionPolicy : defaultRetentionPolicy ,
} )
return nil
}
// CreateContinuousQuery adds a CQ to the meta store.
2016-01-20 00:04:42 +00:00
func ( ms * MetaClient ) CreateContinuousQuery ( database , name , query string ) error {
2015-06-06 00:23:44 +00:00
ms . mu . Lock ( )
defer ms . mu . Unlock ( )
2015-06-05 23:19:44 +00:00
if ms . Err != nil {
return ms . Err
}
2016-04-30 22:04:38 +00:00
dbi := ms . database ( database )
if dbi == nil {
2015-06-05 23:19:44 +00:00
return fmt . Errorf ( "database not found: %s" , database )
}
// See if CQ already exists.
for _ , cqi := range dbi . ContinuousQueries {
if cqi . Name == name {
return fmt . Errorf ( "continuous query already exists: %s" , name )
}
}
// Create a new CQ and store it.
dbi . ContinuousQueries = append ( dbi . ContinuousQueries , meta . ContinuousQueryInfo {
Name : name ,
Query : query ,
} )
return nil
}
2016-03-31 22:12:29 +00:00
// StatementExecutor is a mock statement executor.
type StatementExecutor struct {
2016-06-07 17:28:33 +00:00
ExecuteStatementFn func ( stmt influxql . Statement , ctx influxql . ExecutionContext ) error
2015-06-05 06:00:54 +00:00
}
2016-06-07 17:28:33 +00:00
func ( e * StatementExecutor ) ExecuteStatement ( stmt influxql . Statement , ctx influxql . ExecutionContext ) error {
2016-03-31 22:12:29 +00:00
return e . ExecuteStatementFn ( stmt , ctx )
}
2015-06-05 06:00:54 +00:00
2015-06-05 18:40:22 +00:00
func wait ( c chan struct { } , d time . Duration ) ( err error ) {
select {
case <- c :
case <- time . After ( d ) :
err = errors . New ( "timed out" )
}
return
}