2015-06-05 06:00:54 +00:00
package continuous_querier
import (
"errors"
"fmt"
2015-06-15 13:36:43 +00:00
"io/ioutil"
2015-06-05 06:00:54 +00:00
"log"
2015-06-06 00:23:44 +00:00
"sync"
2015-06-05 06:00:54 +00:00
"testing"
"time"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
)
var (
expectedErr = errors . New ( "expected error" )
unexpectedErr = errors . New ( "unexpected error" )
)
// Test closing never opened, open, open already open, close, and close already closed.
func TestOpenAndClose ( t * testing . T ) {
2015-06-05 18:40:22 +00:00
s := NewTestService ( t )
2015-06-05 06:00:54 +00:00
if err := s . Close ( ) ; err != nil {
t . Error ( err )
} else if err = s . Open ( ) ; err != nil {
t . Error ( err )
} else if err = s . Open ( ) ; err != nil {
t . Error ( err )
} else if err = s . Close ( ) ; err != nil {
t . Error ( err )
} else if err = s . Close ( ) ; err != nil {
t . Error ( err )
}
}
// Test ExecuteContinuousQuery happy path.
func TestExecuteContinuousQuery_HappyPath ( t * testing . T ) {
2015-06-05 18:40:22 +00:00
s := NewTestService ( t )
2015-06-05 06:00:54 +00:00
dbis , _ := s . MetaStore . Databases ( )
dbi := dbis [ 0 ]
cqi := dbi . ContinuousQueries [ 0 ]
2015-06-05 18:40:22 +00:00
pointCnt := 100
2015-06-05 06:00:54 +00:00
qe := s . QueryExecutor . ( * QueryExecutor )
qe . Results = [ ] * influxql . Result { genResult ( 1 , pointCnt ) }
pw := s . PointsWriter . ( * PointsWriter )
pw . WritePointsFn = func ( p * cluster . WritePointsRequest ) error {
if len ( p . Points ) != pointCnt {
return fmt . Errorf ( "exp = %d, got = %d" , pointCnt , len ( p . Points ) )
}
return nil
}
err := s . ExecuteContinuousQuery ( & dbi , & cqi )
if err != nil {
t . Error ( err )
}
}
// Test the service happy path.
func TestService_HappyPath ( t * testing . T ) {
2015-06-05 18:40:22 +00:00
s := NewTestService ( t )
2015-06-05 06:00:54 +00:00
2015-06-05 18:40:22 +00:00
pointCnt := 100
2015-06-05 06:00:54 +00:00
qe := s . QueryExecutor . ( * QueryExecutor )
qe . Results = [ ] * influxql . Result { genResult ( 1 , pointCnt ) }
pw := s . PointsWriter . ( * PointsWriter )
2015-06-05 23:40:42 +00:00
ch := make ( chan int , 5 )
defer close ( ch )
2015-06-05 06:00:54 +00:00
pw . WritePointsFn = func ( p * cluster . WritePointsRequest ) error {
2015-06-05 23:40:42 +00:00
ch <- len ( p . Points )
2015-06-05 06:00:54 +00:00
return nil
}
s . Open ( )
2015-06-05 23:40:42 +00:00
if cnt , err := waitInt ( ch , time . Second ) ; err != nil {
2015-06-05 18:40:22 +00:00
t . Error ( err )
2015-06-05 23:40:42 +00:00
} else if cnt != pointCnt {
t . Errorf ( "exp = %d, got = %d" , pointCnt , cnt )
2015-06-05 18:40:22 +00:00
}
s . Close ( )
}
2015-06-05 23:19:44 +00:00
// Test Run method.
func TestService_Run ( t * testing . T ) {
s := NewTestService ( t )
// Set RunInterval high so we can trigger using Run method.
s . RunInterval = 10 * time . Minute
// Only want one call to ExecuteQueryFn per CQ.
s . Config . RecomputePreviousN = 0
done := make ( chan struct { } )
expectCallCnt := 2
callCnt := 0
// Set a callback for ExecuteQuery.
qe := s . QueryExecutor . ( * QueryExecutor )
qe . ExecuteQueryFn = func ( query * influxql . Query , database string , chunkSize int ) ( <- chan * influxql . Result , error ) {
callCnt ++
if callCnt >= expectCallCnt {
done <- struct { } { }
}
return nil , nil
}
s . Open ( )
// Trigger service to run all CQs.
s . Run ( "" , "" )
// Shouldn't time out.
if err := wait ( done , 100 * time . Millisecond ) ; err != nil {
t . Error ( err )
}
// This time it should timeout because ExecuteQuery should not get called again.
if err := wait ( done , 100 * time . Millisecond ) ; err == nil {
t . Error ( "too many queries executed" )
}
s . Close ( )
// Now test just one query.
expectCallCnt = 1
callCnt = 0
s . Open ( )
s . Run ( "db" , "cq" )
// Shouldn't time out.
if err := wait ( done , 100 * time . Millisecond ) ; err != nil {
t . Error ( err )
}
// This time it should timeout because ExecuteQuery should not get called again.
if err := wait ( done , 100 * time . Millisecond ) ; err == nil {
t . Error ( "too many queries executed" )
}
s . Close ( )
}
2015-06-05 18:40:22 +00:00
// Test service when not the cluster leader (CQs shouldn't run).
func TestService_NotLeader ( t * testing . T ) {
s := NewTestService ( t )
// Set RunInterval high so we can test triggering with the RunCh below.
s . RunInterval = 10 * time . Second
s . MetaStore . ( * MetaStore ) . Leader = false
done := make ( chan struct { } )
qe := s . QueryExecutor . ( * QueryExecutor )
// Set a callback for ExecuteQuery. Shouldn't get called because we're not the leader.
qe . ExecuteQueryFn = func ( query * influxql . Query , database string , chunkSize int ) ( <- chan * influxql . Result , error ) {
done <- struct { } { }
return nil , unexpectedErr
}
s . Open ( )
// Trigger service to run CQs.
s . RunCh <- struct { } { }
// Expect timeout error because ExecuteQuery callback wasn't called.
if err := wait ( done , 100 * time . Millisecond ) ; err == nil {
t . Error ( err )
}
s . Close ( )
}
// Test service behavior when meta store fails to get databases.
func TestService_MetaStoreFailsToGetDatabases ( t * testing . T ) {
s := NewTestService ( t )
// Set RunInterval high so we can test triggering with the RunCh below.
s . RunInterval = 10 * time . Second
s . MetaStore . ( * MetaStore ) . Err = expectedErr
done := make ( chan struct { } )
qe := s . QueryExecutor . ( * QueryExecutor )
// Set ExecuteQuery callback, which shouldn't get called because of meta store failure.
qe . ExecuteQueryFn = func ( query * influxql . Query , database string , chunkSize int ) ( <- chan * influxql . Result , error ) {
done <- struct { } { }
return nil , unexpectedErr
}
s . Open ( )
// Trigger service to run CQs.
s . RunCh <- struct { } { }
// Expect timeout error because ExecuteQuery callback wasn't called.
if err := wait ( done , 100 * time . Millisecond ) ; err == nil {
t . Error ( err )
}
2015-06-05 06:00:54 +00:00
s . Close ( )
}
// Test ExecuteContinuousQuery with invalid queries.
func TestExecuteContinuousQuery_InvalidQueries ( t * testing . T ) {
2015-06-05 18:40:22 +00:00
s := NewTestService ( t )
2015-06-05 06:00:54 +00:00
dbis , _ := s . MetaStore . Databases ( )
dbi := dbis [ 0 ]
cqi := dbi . ContinuousQueries [ 0 ]
cqi . Query = ` this is not a query `
err := s . ExecuteContinuousQuery ( & dbi , & cqi )
if err == nil {
t . Error ( "expected error but got nil" )
}
// Valid query but invalid continuous query.
cqi . Query = ` SELECT * FROM cpu `
err = s . ExecuteContinuousQuery ( & dbi , & cqi )
if err == nil {
t . Error ( "expected error but got nil" )
}
// Group by requires aggregate.
2015-06-05 18:40:22 +00:00
cqi . Query = ` SELECT value INTO other_value FROM cpu WHERE time > now() - 1h GROUP BY time(1s) `
2015-06-05 06:00:54 +00:00
err = s . ExecuteContinuousQuery ( & dbi , & cqi )
if err == nil {
t . Error ( "expected error but got nil" )
}
}
// Test ExecuteContinuousQuery when QueryExecutor returns an error.
func TestExecuteContinuousQuery_QueryExecutor_Error ( t * testing . T ) {
2015-06-05 18:40:22 +00:00
s := NewTestService ( t )
2015-06-05 06:00:54 +00:00
qe := s . QueryExecutor . ( * QueryExecutor )
qe . Err = expectedErr
dbis , _ := s . MetaStore . Databases ( )
dbi := dbis [ 0 ]
cqi := dbi . ContinuousQueries [ 0 ]
err := s . ExecuteContinuousQuery ( & dbi , & cqi )
if err != expectedErr {
t . Errorf ( "exp = %s, got = %v" , expectedErr , err )
}
}
// NewTestService returns a new *Service with default mock object members.
2015-06-05 18:40:22 +00:00
func NewTestService ( t * testing . T ) * Service {
2015-06-05 06:00:54 +00:00
s := NewService ( NewConfig ( ) )
2015-06-05 23:19:44 +00:00
ms := NewMetaStore ( t )
s . MetaStore = ms
2015-06-05 18:40:22 +00:00
s . QueryExecutor = NewQueryExecutor ( t )
s . PointsWriter = NewPointsWriter ( t )
s . RunInterval = time . Millisecond
2015-06-05 06:00:54 +00:00
// Set Logger to write to dev/null so stdout isn't polluted.
2015-06-15 13:36:43 +00:00
if ! testing . Verbose ( ) {
s . Logger = log . New ( ioutil . Discard , "" , log . LstdFlags )
}
2015-06-05 06:00:54 +00:00
2015-06-05 23:19:44 +00:00
// Add a couple test databases and CQs.
ms . CreateDatabase ( "db" , "rp" )
2015-06-15 13:36:43 +00:00
ms . CreateContinuousQuery ( "db" , "cq" , ` CREATE CONTINUOUS QUERY cq ON db BEGIN SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1h GROUP BY time(1s) END ` )
2015-06-05 23:19:44 +00:00
ms . CreateDatabase ( "db2" , "default" )
2015-06-15 13:36:43 +00:00
ms . CreateContinuousQuery ( "db2" , "cq2" , ` CREATE CONTINUOUS QUERY cq2 ON db2 BEGIN SELECT mean(value) INTO cpu_mean FROM cpu WHERE time > now() - 10m GROUP BY time(1m) END ` )
2015-06-05 23:19:44 +00:00
2015-06-05 06:00:54 +00:00
return s
}
// MetaStore is a mock meta store.
type MetaStore struct {
2015-06-06 00:23:44 +00:00
mu sync . RWMutex
2015-06-05 06:00:54 +00:00
Leader bool
DatabaseInfos [ ] meta . DatabaseInfo
Err error
2015-06-05 18:40:22 +00:00
t * testing . T
2015-06-05 06:00:54 +00:00
}
// NewMetaStore returns a *MetaStore.
2015-06-05 18:40:22 +00:00
func NewMetaStore ( t * testing . T ) * MetaStore {
2015-06-05 06:00:54 +00:00
return & MetaStore {
Leader : true ,
2015-06-05 23:19:44 +00:00
t : t ,
2015-06-05 06:00:54 +00:00
}
}
// IsLeader returns true if the node is the cluster leader.
2015-06-06 00:23:44 +00:00
func ( ms * MetaStore ) IsLeader ( ) bool {
ms . mu . RLock ( )
defer ms . mu . RUnlock ( )
return ms . Leader
}
2015-06-05 06:00:54 +00:00
// Databases returns a list of database info about each database in the cluster.
2015-06-06 00:23:44 +00:00
func ( ms * MetaStore ) Databases ( ) ( [ ] meta . DatabaseInfo , error ) {
ms . mu . RLock ( )
2015-06-06 02:30:57 +00:00
defer ms . mu . RUnlock ( )
2015-06-06 00:23:44 +00:00
return ms . DatabaseInfos , ms . Err
}
2015-06-05 06:00:54 +00:00
2015-06-05 23:19:44 +00:00
// Database returns a single database by name.
func ( ms * MetaStore ) Database ( name string ) ( * meta . DatabaseInfo , error ) {
2015-06-06 00:23:44 +00:00
ms . mu . RLock ( )
defer ms . mu . RUnlock ( )
return ms . database ( name )
}
func ( ms * MetaStore ) database ( name string ) ( * meta . DatabaseInfo , error ) {
2015-06-05 23:19:44 +00:00
if ms . Err != nil {
return nil , ms . Err
}
for i := range ms . DatabaseInfos {
if ms . DatabaseInfos [ i ] . Name == name {
return & ms . DatabaseInfos [ i ] , nil
}
}
return nil , fmt . Errorf ( "database not found: %s" , name )
}
// CreateDatabase adds a new database to the meta store.
func ( ms * MetaStore ) CreateDatabase ( name , defaultRetentionPolicy string ) error {
2015-06-06 00:23:44 +00:00
ms . mu . Lock ( )
defer ms . mu . Unlock ( )
2015-06-05 23:19:44 +00:00
if ms . Err != nil {
return ms . Err
}
// See if the database already exists.
for _ , dbi := range ms . DatabaseInfos {
if dbi . Name == name {
return fmt . Errorf ( "database already exists: %s" , name )
}
}
// Create database.
ms . DatabaseInfos = append ( ms . DatabaseInfos , meta . DatabaseInfo {
Name : name ,
DefaultRetentionPolicy : defaultRetentionPolicy ,
} )
return nil
}
// CreateContinuousQuery adds a CQ to the meta store.
func ( ms * MetaStore ) CreateContinuousQuery ( database , name , query string ) error {
2015-06-06 00:23:44 +00:00
ms . mu . Lock ( )
defer ms . mu . Unlock ( )
2015-06-05 23:19:44 +00:00
if ms . Err != nil {
return ms . Err
}
2015-06-06 00:23:44 +00:00
dbi , err := ms . database ( database )
2015-06-05 23:19:44 +00:00
if err != nil {
return err
} else if dbi == nil {
return fmt . Errorf ( "database not found: %s" , database )
}
// See if CQ already exists.
for _ , cqi := range dbi . ContinuousQueries {
if cqi . Name == name {
return fmt . Errorf ( "continuous query already exists: %s" , name )
}
}
// Create a new CQ and store it.
dbi . ContinuousQueries = append ( dbi . ContinuousQueries , meta . ContinuousQueryInfo {
Name : name ,
Query : query ,
} )
return nil
}
2015-06-05 06:00:54 +00:00
// QueryExecutor is a mock query executor.
type QueryExecutor struct {
2015-06-05 18:40:22 +00:00
ExecuteQueryFn func ( query * influxql . Query , database string , chunkSize int ) ( <- chan * influxql . Result , error )
2015-06-05 06:00:54 +00:00
Results [ ] * influxql . Result
ResultInterval time . Duration
Err error
ErrAfterResult int
StopRespondingAfter int
2015-06-05 18:40:22 +00:00
t * testing . T
2015-06-05 06:00:54 +00:00
}
// NewQueryExecutor returns a *QueryExecutor.
2015-06-05 18:40:22 +00:00
func NewQueryExecutor ( t * testing . T ) * QueryExecutor {
2015-06-05 06:00:54 +00:00
return & QueryExecutor {
ErrAfterResult : - 1 ,
StopRespondingAfter : - 1 ,
2015-06-05 18:40:22 +00:00
t : t ,
2015-06-05 06:00:54 +00:00
}
}
// ExecuteQuery returns a channel that the caller can read query results from.
func ( qe * QueryExecutor ) ExecuteQuery ( query * influxql . Query , database string , chunkSize int ) ( <- chan * influxql . Result , error ) {
2015-06-05 18:40:22 +00:00
// If the test set a callback, call it.
2015-06-05 06:00:54 +00:00
if qe . ExecuteQueryFn != nil {
2015-06-05 18:40:22 +00:00
if _ , err := qe . ExecuteQueryFn ( query , database , chunkSize ) ; err != nil {
2015-06-05 06:00:54 +00:00
return nil , err
}
}
// Are we supposed to error immediately?
if qe . ErrAfterResult == - 1 && qe . Err != nil {
return nil , qe . Err
}
2015-06-05 18:40:22 +00:00
ch := make ( chan * influxql . Result )
2015-06-05 06:00:54 +00:00
// Start a go routine to send results and / or error.
go func ( ) {
2015-06-05 18:40:22 +00:00
n := 0
2015-06-05 06:00:54 +00:00
for i , r := range qe . Results {
2015-06-05 18:40:22 +00:00
if i == qe . ErrAfterResult - 1 {
qe . t . Logf ( "ExecuteQuery(): ErrAfterResult %d" , qe . ErrAfterResult - 1 )
ch <- & influxql . Result { Err : qe . Err }
close ( ch )
2015-06-05 06:00:54 +00:00
return
} else if i == qe . StopRespondingAfter {
2015-06-05 18:40:22 +00:00
qe . t . Log ( "ExecuteQuery(): StopRespondingAfter" )
2015-06-05 06:00:54 +00:00
return
}
2015-06-05 18:40:22 +00:00
ch <- r
n ++
2015-06-05 06:00:54 +00:00
time . Sleep ( qe . ResultInterval )
}
2015-06-05 18:40:22 +00:00
qe . t . Logf ( "ExecuteQuery(): all (%d) results sent" , n )
close ( ch )
2015-06-05 06:00:54 +00:00
} ( )
2015-06-05 18:40:22 +00:00
return ch , nil
2015-06-05 06:00:54 +00:00
}
// PointsWriter is a mock points writer.
type PointsWriter struct {
WritePointsFn func ( p * cluster . WritePointsRequest ) error
Err error
PointsPerSecond int
2015-06-05 18:40:22 +00:00
t * testing . T
2015-06-05 06:00:54 +00:00
}
// NewPointsWriter returns a new *PointsWriter.
2015-06-05 18:40:22 +00:00
func NewPointsWriter ( t * testing . T ) * PointsWriter {
2015-06-05 06:00:54 +00:00
return & PointsWriter {
PointsPerSecond : 25000 ,
2015-06-05 18:40:22 +00:00
t : t ,
2015-06-05 06:00:54 +00:00
}
}
2015-06-05 18:40:22 +00:00
// WritePoints mocks writing points.
2015-06-05 06:00:54 +00:00
func ( pw * PointsWriter ) WritePoints ( p * cluster . WritePointsRequest ) error {
2015-06-05 18:40:22 +00:00
// If the test set a callback, call it.
2015-06-05 06:00:54 +00:00
if pw . WritePointsFn != nil {
if err := pw . WritePointsFn ( p ) ; err != nil {
return err
}
}
if pw . Err != nil {
return pw . Err
}
ns := time . Duration ( ( 1 / pw . PointsPerSecond ) * 1000000000 )
time . Sleep ( ns )
return nil
}
// genResult generates a dummy query result.
func genResult ( rowCnt , valCnt int ) * influxql . Result {
rows := make ( influxql . Rows , 0 , rowCnt )
now := time . Now ( )
for n := 0 ; n < rowCnt ; n ++ {
vals := make ( [ ] [ ] interface { } , 0 , valCnt )
for m := 0 ; m < valCnt ; m ++ {
vals = append ( vals , [ ] interface { } { now , float64 ( m ) } )
now . Add ( time . Second )
}
row := & influxql . Row {
Name : "cpu" ,
Tags : map [ string ] string { "host" : "server01" } ,
Columns : [ ] string { "time" , "value" } ,
Values : vals ,
}
rows = append ( rows , row )
}
return & influxql . Result {
Series : rows ,
}
}
2015-06-05 18:40:22 +00:00
func wait ( c chan struct { } , d time . Duration ) ( err error ) {
select {
case <- c :
case <- time . After ( d ) :
err = errors . New ( "timed out" )
}
return
}
2015-06-05 23:19:44 +00:00
2015-06-05 23:40:42 +00:00
func waitInt ( c chan int , d time . Duration ) ( i int , err error ) {
select {
case i = <- c :
case <- time . After ( d ) :
err = errors . New ( "timed out" )
}
return
}
2015-06-05 23:19:44 +00:00
func check ( err error ) {
if err != nil {
panic ( err )
}
}