2016-05-11 16:32:56 +00:00
package coordinator_test
2016-02-19 20:38:02 +00:00
import (
"bytes"
2016-03-23 15:05:38 +00:00
"errors"
2016-02-19 20:38:02 +00:00
"io"
"os"
"reflect"
2016-11-23 20:32:42 +00:00
"regexp"
2016-02-19 20:38:02 +00:00
"testing"
"time"
"github.com/davecgh/go-spew/spew"
2016-05-11 16:32:56 +00:00
"github.com/influxdata/influxdb/coordinator"
2016-02-19 20:38:02 +00:00
"github.com/influxdata/influxdb/influxql"
2016-10-27 22:17:08 +00:00
"github.com/influxdata/influxdb/internal"
2016-02-19 20:38:02 +00:00
"github.com/influxdata/influxdb/models"
2017-08-15 19:24:22 +00:00
"github.com/influxdata/influxdb/query"
2016-02-19 20:38:02 +00:00
"github.com/influxdata/influxdb/services/meta"
2016-06-06 19:53:54 +00:00
"github.com/influxdata/influxdb/tsdb"
2017-02-17 23:17:22 +00:00
"github.com/uber-go/zap"
2016-02-19 20:38:02 +00:00
)
const (
// DefaultDatabase is the default database name used in tests.
DefaultDatabase = "db0"
// DefaultRetentionPolicy is the default retention policy name used in tests.
DefaultRetentionPolicy = "rp0"
)
// Ensure query executor can execute a simple SELECT statement.
func TestQueryExecutor_ExecuteQuery_SelectStatement ( t * testing . T ) {
e := DefaultQueryExecutor ( )
// The meta client should return a single shard owned by the local node.
2016-11-23 20:32:42 +00:00
e . MetaClient . ShardGroupsByTimeRangeFn = func ( database , policy string , min , max time . Time ) ( a [ ] meta . ShardGroupInfo , err error ) {
return [ ] meta . ShardGroupInfo {
{ ID : 1 , Shards : [ ] meta . ShardInfo {
{ ID : 100 , Owners : [ ] meta . ShardOwner { { NodeID : 0 } } } ,
} } ,
} , nil
2016-02-19 20:38:02 +00:00
}
// The TSDB store should return an IteratorCreator for shard.
// This IteratorCreator returns a single iterator with "value" in the aux fields.
2016-11-23 20:32:42 +00:00
e . TSDBStore . ShardGroupFn = func ( ids [ ] uint64 ) tsdb . ShardGroup {
if ! reflect . DeepEqual ( ids , [ ] uint64 { 100 } ) {
t . Fatalf ( "unexpected shard ids: %v" , ids )
2016-02-19 20:38:02 +00:00
}
2016-11-23 20:32:42 +00:00
var sh MockShard
2017-08-15 19:24:22 +00:00
sh . CreateIteratorFn = func ( m string , opt query . IteratorOptions ) ( query . Iterator , error ) {
return & FloatIterator { Points : [ ] query . FloatPoint {
2016-02-19 20:38:02 +00:00
{ Name : "cpu" , Time : int64 ( 0 * time . Second ) , Aux : [ ] interface { } { float64 ( 100 ) } } ,
{ Name : "cpu" , Time : int64 ( 1 * time . Second ) , Aux : [ ] interface { } { float64 ( 200 ) } } ,
} } , nil
}
2016-11-23 20:32:42 +00:00
sh . FieldDimensionsFn = func ( measurements [ ] string ) ( fields map [ string ] influxql . DataType , dimensions map [ string ] struct { } , err error ) {
if ! reflect . DeepEqual ( measurements , [ ] string { "cpu" } ) {
t . Fatalf ( "unexpected source: %#v" , measurements )
}
2016-05-16 16:08:28 +00:00
return map [ string ] influxql . DataType { "value" : influxql . Float } , nil , nil
2016-02-19 20:38:02 +00:00
}
2016-11-23 20:32:42 +00:00
return & sh
2016-02-19 20:38:02 +00:00
}
// Verify all results from the query.
2017-08-15 19:24:22 +00:00
if a := ReadAllResults ( e . ExecuteQuery ( ` SELECT * FROM cpu ` , "db0" , 0 ) ) ; ! reflect . DeepEqual ( a , [ ] * query . Result {
2016-02-19 20:38:02 +00:00
{
StatementID : 0 ,
Series : [ ] * models . Row { {
Name : "cpu" ,
Columns : [ ] string { "time" , "value" } ,
Values : [ ] [ ] interface { } {
{ time . Unix ( 0 , 0 ) . UTC ( ) , float64 ( 100 ) } ,
{ time . Unix ( 1 , 0 ) . UTC ( ) , float64 ( 200 ) } ,
} ,
} } ,
} ,
} ) {
t . Fatalf ( "unexpected results: %s" , spew . Sdump ( a ) )
}
}
2016-03-31 01:00:29 +00:00
// Ensure query executor can enforce a maximum bucket selection count.
func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN ( t * testing . T ) {
e := DefaultQueryExecutor ( )
2016-03-31 22:12:29 +00:00
e . StatementExecutor . MaxSelectBucketsN = 3
2016-03-31 01:00:29 +00:00
// The meta client should return a single shards on the local node.
2016-11-23 20:32:42 +00:00
e . MetaClient . ShardGroupsByTimeRangeFn = func ( database , policy string , min , max time . Time ) ( a [ ] meta . ShardGroupInfo , err error ) {
return [ ] meta . ShardGroupInfo {
{ ID : 1 , Shards : [ ] meta . ShardInfo {
{ ID : 100 , Owners : [ ] meta . ShardOwner { { NodeID : 0 } } } ,
} } ,
2016-03-31 01:00:29 +00:00
} , nil
}
2016-11-23 20:32:42 +00:00
e . TSDBStore . ShardGroupFn = func ( ids [ ] uint64 ) tsdb . ShardGroup {
if ! reflect . DeepEqual ( ids , [ ] uint64 { 100 } ) {
t . Fatalf ( "unexpected shard ids: %v" , ids )
}
var sh MockShard
2017-08-15 19:24:22 +00:00
sh . CreateIteratorFn = func ( m string , opt query . IteratorOptions ) ( query . Iterator , error ) {
2016-11-23 20:32:42 +00:00
return & FloatIterator {
2017-08-15 19:24:22 +00:00
Points : [ ] query . FloatPoint { { Name : "cpu" , Time : int64 ( 0 * time . Second ) , Aux : [ ] interface { } { float64 ( 100 ) } } } ,
2016-11-23 20:32:42 +00:00
} , nil
}
sh . FieldDimensionsFn = func ( measurements [ ] string ) ( fields map [ string ] influxql . DataType , dimensions map [ string ] struct { } , err error ) {
if ! reflect . DeepEqual ( measurements , [ ] string { "cpu" } ) {
t . Fatalf ( "unexpected source: %#v" , measurements )
}
return map [ string ] influxql . DataType { "value" : influxql . Float } , nil , nil
}
return & sh
2016-03-31 01:00:29 +00:00
}
// Verify all results from the query.
2017-08-15 19:24:22 +00:00
if a := ReadAllResults ( e . ExecuteQuery ( ` SELECT count(value) FROM cpu WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:35Z' GROUP BY time(10s) ` , "db0" , 0 ) ) ; ! reflect . DeepEqual ( a , [ ] * query . Result {
2016-03-31 01:00:29 +00:00
{
StatementID : 0 ,
2016-10-28 20:44:33 +00:00
Err : errors . New ( "max-select-buckets limit exceeded: (4/3)" ) ,
2016-03-31 01:00:29 +00:00
} ,
} ) {
t . Fatalf ( "unexpected results: %s" , spew . Sdump ( a ) )
2016-03-23 15:05:38 +00:00
}
}
2016-10-27 22:17:08 +00:00
func TestStatementExecutor_NormalizeDropSeries ( t * testing . T ) {
q , err := influxql . ParseQuery ( "DROP SERIES FROM cpu" )
if err != nil {
t . Fatalf ( "unexpected error parsing query: %v" , err )
}
stmt := q . Statements [ 0 ] . ( * influxql . DropSeriesStatement )
s := & coordinator . StatementExecutor {
MetaClient : & internal . MetaClientMock {
DatabaseFn : func ( name string ) * meta . DatabaseInfo {
t . Fatal ( "meta client should not be called" )
return nil
} ,
} ,
}
if err := s . NormalizeStatement ( stmt , "foo" ) ; err != nil {
t . Fatalf ( "unexpected error normalizing statement: %v" , err )
}
m := stmt . Sources [ 0 ] . ( * influxql . Measurement )
if m . Database != "" {
t . Fatalf ( "database rewritten when not supposed to: %v" , m . Database )
}
if m . RetentionPolicy != "" {
t . Fatalf ( "database rewritten when not supposed to: %v" , m . RetentionPolicy )
}
if exp , got := "DROP SERIES FROM cpu" , q . String ( ) ; exp != got {
t . Fatalf ( "generated query does match parsed: exp %v, got %v" , exp , got )
}
}
func TestStatementExecutor_NormalizeDeleteSeries ( t * testing . T ) {
q , err := influxql . ParseQuery ( "DELETE FROM cpu" )
if err != nil {
t . Fatalf ( "unexpected error parsing query: %v" , err )
}
stmt := q . Statements [ 0 ] . ( * influxql . DeleteSeriesStatement )
s := & coordinator . StatementExecutor {
MetaClient : & internal . MetaClientMock {
DatabaseFn : func ( name string ) * meta . DatabaseInfo {
t . Fatal ( "meta client should not be called" )
return nil
} ,
} ,
}
if err := s . NormalizeStatement ( stmt , "foo" ) ; err != nil {
t . Fatalf ( "unexpected error normalizing statement: %v" , err )
}
m := stmt . Sources [ 0 ] . ( * influxql . Measurement )
if m . Database != "" {
t . Fatalf ( "database rewritten when not supposed to: %v" , m . Database )
}
if m . RetentionPolicy != "" {
t . Fatalf ( "database rewritten when not supposed to: %v" , m . RetentionPolicy )
}
if exp , got := "DELETE FROM cpu" , q . String ( ) ; exp != got {
t . Fatalf ( "generated query does match parsed: exp %v, got %v" , exp , got )
}
}
2017-02-09 19:32:52 +00:00
type mockAuthorizer struct {
AuthorizeDatabaseFn func ( influxql . Privilege , string ) bool
}
func ( a * mockAuthorizer ) AuthorizeDatabase ( p influxql . Privilege , name string ) bool {
return a . AuthorizeDatabaseFn ( p , name )
}
2017-05-05 17:20:00 +00:00
func ( m * mockAuthorizer ) AuthorizeQuery ( database string , query * influxql . Query ) error {
panic ( "fail" )
}
2017-05-26 19:29:04 +00:00
func ( m * mockAuthorizer ) AuthorizeSeriesRead ( database string , measurement [ ] byte , tags models . Tags ) bool {
2017-05-05 17:20:00 +00:00
panic ( "fail" )
}
2017-05-26 19:29:04 +00:00
func ( m * mockAuthorizer ) AuthorizeSeriesWrite ( database string , measurement [ ] byte , tags models . Tags ) bool {
2017-05-05 17:20:00 +00:00
panic ( "fail" )
}
2017-02-09 19:32:52 +00:00
func TestQueryExecutor_ExecuteQuery_ShowDatabases ( t * testing . T ) {
2017-08-15 19:24:22 +00:00
qe := query . NewQueryExecutor ( )
2017-02-09 19:32:52 +00:00
qe . StatementExecutor = & coordinator . StatementExecutor {
MetaClient : & internal . MetaClientMock {
DatabasesFn : func ( ) [ ] meta . DatabaseInfo {
return [ ] meta . DatabaseInfo {
{ Name : "db1" } , { Name : "db2" } , { Name : "db3" } , { Name : "db4" } ,
}
} ,
} ,
}
2017-08-15 19:24:22 +00:00
opt := query . ExecutionOptions {
2017-02-09 19:32:52 +00:00
Authorizer : & mockAuthorizer {
AuthorizeDatabaseFn : func ( p influxql . Privilege , name string ) bool {
return name == "db2" || name == "db4"
} ,
} ,
}
q , err := influxql . ParseQuery ( "SHOW DATABASES" )
if err != nil {
t . Fatal ( err )
}
results := ReadAllResults ( qe . ExecuteQuery ( q , opt , make ( chan struct { } ) ) )
2017-08-15 19:24:22 +00:00
exp := [ ] * query . Result {
2017-02-09 19:32:52 +00:00
{
StatementID : 0 ,
Series : [ ] * models . Row { {
Name : "databases" ,
Columns : [ ] string { "name" } ,
Values : [ ] [ ] interface { } {
{ "db2" } , { "db4" } ,
} ,
} } ,
} ,
}
if ! reflect . DeepEqual ( results , exp ) {
t . Fatalf ( "unexpected results: exp %s, got %s" , spew . Sdump ( exp ) , spew . Sdump ( results ) )
}
}
2016-05-11 16:32:56 +00:00
// QueryExecutor is a test wrapper for coordinator.QueryExecutor.
2016-02-19 20:38:02 +00:00
type QueryExecutor struct {
2017-08-15 19:24:22 +00:00
* query . QueryExecutor
2016-02-19 20:38:02 +00:00
2016-03-31 22:12:29 +00:00
MetaClient MetaClient
TSDBStore TSDBStore
2016-05-11 16:32:56 +00:00
StatementExecutor * coordinator . StatementExecutor
2016-03-31 22:12:29 +00:00
LogOutput bytes . Buffer
2016-02-19 20:38:02 +00:00
}
// NewQueryExecutor returns a new instance of QueryExecutor.
// This query executor always has a node id of 0.
func NewQueryExecutor ( ) * QueryExecutor {
e := & QueryExecutor {
2017-08-15 19:24:22 +00:00
QueryExecutor : query . NewQueryExecutor ( ) ,
2016-02-19 20:38:02 +00:00
}
2016-05-11 16:32:56 +00:00
e . StatementExecutor = & coordinator . StatementExecutor {
2016-03-31 22:12:29 +00:00
MetaClient : & e . MetaClient ,
TSDBStore : & e . TSDBStore ,
2016-11-23 20:32:42 +00:00
ShardMapper : & coordinator . LocalShardMapper {
MetaClient : & e . MetaClient ,
TSDBStore : & e . TSDBStore ,
} ,
2016-03-31 22:12:29 +00:00
}
e . QueryExecutor . StatementExecutor = e . StatementExecutor
2016-02-19 20:38:02 +00:00
2016-04-20 20:07:08 +00:00
var out io . Writer = & e . LogOutput
2016-02-19 20:38:02 +00:00
if testing . Verbose ( ) {
2016-12-30 10:42:38 +00:00
out = io . MultiWriter ( out , os . Stderr )
2016-02-19 20:38:02 +00:00
}
2016-12-01 18:26:23 +00:00
e . QueryExecutor . WithLogger ( zap . New (
zap . NewTextEncoder ( ) ,
2016-12-30 10:42:38 +00:00
zap . Output ( zap . AddSync ( out ) ) ,
2016-12-01 18:26:23 +00:00
) )
2016-02-19 20:38:02 +00:00
return e
}
// DefaultQueryExecutor returns a QueryExecutor with a database (db0) and retention policy (rp0).
func DefaultQueryExecutor ( ) * QueryExecutor {
e := NewQueryExecutor ( )
e . MetaClient . DatabaseFn = DefaultMetaClientDatabaseFn
return e
}
// ExecuteQuery parses query and executes against the database.
2017-08-15 19:24:22 +00:00
func ( e * QueryExecutor ) ExecuteQuery ( q , database string , chunkSize int ) <- chan * query . Result {
return e . QueryExecutor . ExecuteQuery ( MustParseQuery ( q ) , query . ExecutionOptions {
2016-06-01 17:30:50 +00:00
Database : database ,
ChunkSize : chunkSize ,
} , make ( chan struct { } ) )
2016-02-19 20:38:02 +00:00
}
2016-05-11 16:32:56 +00:00
// TSDBStore is a mockable implementation of coordinator.TSDBStore.
2016-02-19 20:38:02 +00:00
type TSDBStore struct {
2016-06-01 22:17:18 +00:00
CreateShardFn func ( database , policy string , shardID uint64 , enabled bool ) error
2016-02-19 20:38:02 +00:00
WriteToShardFn func ( shardID uint64 , points [ ] models . Point ) error
2016-04-29 00:29:09 +00:00
RestoreShardFn func ( id uint64 , r io . Reader ) error
BackupShardFn func ( id uint64 , since time . Time , w io . Writer ) error
2016-03-31 22:12:29 +00:00
DeleteDatabaseFn func ( name string ) error
DeleteMeasurementFn func ( database , name string ) error
DeleteRetentionPolicyFn func ( database , name string ) error
DeleteShardFn func ( id uint64 ) error
2016-04-29 22:31:57 +00:00
DeleteSeriesFn func ( database string , sources [ ] influxql . Source , condition influxql . Expr ) error
2016-11-23 20:32:42 +00:00
ShardGroupFn func ( ids [ ] uint64 ) tsdb . ShardGroup
2016-02-19 20:38:02 +00:00
}
2016-06-01 22:17:18 +00:00
func ( s * TSDBStore ) CreateShard ( database , policy string , shardID uint64 , enabled bool ) error {
2016-03-01 23:44:07 +00:00
if s . CreateShardFn == nil {
return nil
}
2016-06-01 22:17:18 +00:00
return s . CreateShardFn ( database , policy , shardID , enabled )
2016-02-19 20:38:02 +00:00
}
func ( s * TSDBStore ) WriteToShard ( shardID uint64 , points [ ] models . Point ) error {
return s . WriteToShardFn ( shardID , points )
}
2016-04-29 00:29:09 +00:00
func ( s * TSDBStore ) RestoreShard ( id uint64 , r io . Reader ) error {
return s . RestoreShardFn ( id , r )
}
func ( s * TSDBStore ) BackupShard ( id uint64 , since time . Time , w io . Writer ) error {
return s . BackupShardFn ( id , since , w )
}
2016-02-19 20:38:02 +00:00
func ( s * TSDBStore ) DeleteDatabase ( name string ) error {
return s . DeleteDatabaseFn ( name )
}
func ( s * TSDBStore ) DeleteMeasurement ( database , name string ) error {
return s . DeleteMeasurementFn ( database , name )
}
func ( s * TSDBStore ) DeleteRetentionPolicy ( database , name string ) error {
return s . DeleteRetentionPolicyFn ( database , name )
}
2016-03-11 15:53:15 +00:00
func ( s * TSDBStore ) DeleteShard ( id uint64 ) error {
return s . DeleteShardFn ( id )
}
2016-04-29 22:31:57 +00:00
func ( s * TSDBStore ) DeleteSeries ( database string , sources [ ] influxql . Source , condition influxql . Expr ) error {
return s . DeleteSeriesFn ( database , sources , condition )
2016-02-19 20:38:02 +00:00
}
2016-11-23 20:32:42 +00:00
func ( s * TSDBStore ) ShardGroup ( ids [ ] uint64 ) tsdb . ShardGroup {
return s . ShardGroupFn ( ids )
2016-06-06 19:53:54 +00:00
}
2016-07-28 22:38:08 +00:00
func ( s * TSDBStore ) Measurements ( database string , cond influxql . Expr ) ( [ ] string , error ) {
return nil , nil
}
2016-12-16 14:21:42 +00:00
func ( s * TSDBStore ) MeasurementNames ( database string , cond influxql . Expr ) ( [ ] [ ] byte , error ) {
return nil , nil
}
2016-07-28 22:38:08 +00:00
func ( s * TSDBStore ) TagValues ( database string , cond influxql . Expr ) ( [ ] tsdb . TagValues , error ) {
return nil , nil
}
2016-11-23 20:32:42 +00:00
type MockShard struct {
Measurements [ ] string
FieldDimensionsFn func ( measurements [ ] string ) ( fields map [ string ] influxql . DataType , dimensions map [ string ] struct { } , err error )
2017-08-15 19:24:22 +00:00
CreateIteratorFn func ( m string , opt query . IteratorOptions ) ( query . Iterator , error )
2016-11-23 20:32:42 +00:00
ExpandSourcesFn func ( sources influxql . Sources ) ( influxql . Sources , error )
}
func ( sh * MockShard ) MeasurementsByRegex ( re * regexp . Regexp ) [ ] string {
names := make ( [ ] string , 0 , len ( sh . Measurements ) )
for _ , name := range sh . Measurements {
if re . MatchString ( name ) {
names = append ( names , name )
}
}
return names
}
func ( sh * MockShard ) FieldDimensions ( measurements [ ] string ) ( fields map [ string ] influxql . DataType , dimensions map [ string ] struct { } , err error ) {
return sh . FieldDimensionsFn ( measurements )
}
func ( sh * MockShard ) MapType ( measurement , field string ) influxql . DataType {
f , d , err := sh . FieldDimensions ( [ ] string { measurement } )
if err != nil {
return influxql . Unknown
}
if typ , ok := f [ field ] ; ok {
return typ
} else if _ , ok := d [ field ] ; ok {
return influxql . Tag
}
return influxql . Unknown
}
2017-08-15 19:24:22 +00:00
func ( sh * MockShard ) CreateIterator ( measurement string , opt query . IteratorOptions ) ( query . Iterator , error ) {
2016-11-23 20:32:42 +00:00
return sh . CreateIteratorFn ( measurement , opt )
}
func ( sh * MockShard ) ExpandSources ( sources influxql . Sources ) ( influxql . Sources , error ) {
return sh . ExpandSourcesFn ( sources )
}
2016-02-19 20:38:02 +00:00
// MustParseQuery parses s into a query. Panic on error.
func MustParseQuery ( s string ) * influxql . Query {
q , err := influxql . ParseQuery ( s )
if err != nil {
panic ( err )
}
return q
}
// ReadAllResults reads all results from c and returns as a slice.
2017-08-15 19:24:22 +00:00
func ReadAllResults ( c <- chan * query . Result ) [ ] * query . Result {
var a [ ] * query . Result
2016-02-19 20:38:02 +00:00
for result := range c {
a = append ( a , result )
}
return a
}
// FloatIterator is a represents an iterator that reads from a slice.
type FloatIterator struct {
2017-08-15 19:24:22 +00:00
Points [ ] query . FloatPoint
stats query . IteratorStats
2016-02-19 20:38:02 +00:00
}
2017-08-15 19:24:22 +00:00
func ( itr * FloatIterator ) Stats ( ) query . IteratorStats { return itr . stats }
func ( itr * FloatIterator ) Close ( ) error { return nil }
2016-02-19 20:38:02 +00:00
// Next returns the next value and shifts it off the beginning of the points slice.
2017-08-15 19:24:22 +00:00
func ( itr * FloatIterator ) Next ( ) ( * query . FloatPoint , error ) {
2016-02-19 20:38:02 +00:00
if len ( itr . Points ) == 0 {
2016-04-17 20:00:59 +00:00
return nil , nil
2016-02-19 20:38:02 +00:00
}
v := & itr . Points [ 0 ]
itr . Points = itr . Points [ 1 : ]
2016-04-17 20:00:59 +00:00
return v , nil
2016-02-19 20:38:02 +00:00
}