2014-10-22 05:32:19 +00:00
package influxdb_test
import (
2015-02-06 17:03:34 +00:00
"bytes"
2014-12-24 04:46:54 +00:00
"encoding/json"
2014-10-22 05:32:19 +00:00
"fmt"
"io/ioutil"
2015-01-29 02:30:00 +00:00
"log"
2014-12-29 23:12:51 +00:00
"net/url"
2014-10-22 05:32:19 +00:00
"os"
2014-12-23 06:18:05 +00:00
"reflect"
2015-01-14 23:44:09 +00:00
"strings"
2014-10-22 05:32:19 +00:00
"testing"
2014-11-10 02:55:53 +00:00
"time"
2014-10-22 05:32:19 +00:00
"github.com/influxdb/influxdb"
2015-01-14 23:44:09 +00:00
"github.com/influxdb/influxdb/influxql"
2014-10-22 05:32:19 +00:00
"github.com/influxdb/influxdb/messaging"
2015-02-10 15:54:19 +00:00
"golang.org/x/crypto/bcrypt"
2014-10-22 05:32:19 +00:00
)
// Ensure the server can be successfully opened and closed.
func TestServer_Open ( t * testing . T ) {
2014-12-30 22:46:50 +00:00
s := NewServer ( )
2014-10-22 05:32:19 +00:00
defer s . Close ( )
2014-10-24 00:54:12 +00:00
if err := s . Server . Open ( tempfile ( ) ) ; err != nil {
t . Fatal ( err )
}
if err := s . Server . Close ( ) ; err != nil {
t . Fatal ( err )
}
2014-10-22 05:32:19 +00:00
}
2014-10-25 15:17:08 +00:00
// Ensure an error is returned when opening an already open server.
func TestServer_Open_ErrServerOpen ( t * testing . T ) { t . Skip ( "pending" ) }
// Ensure an error is returned when opening a server without a path.
func TestServer_Open_ErrPathRequired ( t * testing . T ) { t . Skip ( "pending" ) }
2014-12-29 23:12:51 +00:00
// Ensure the server can create a new data node.
2014-12-30 15:50:15 +00:00
func TestServer_CreateDataNode ( t * testing . T ) {
2014-12-29 23:12:51 +00:00
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Create a new node.
u , _ := url . Parse ( "http://localhost:80000" )
2014-12-30 15:50:15 +00:00
if err := s . CreateDataNode ( u ) ; err != nil {
2014-12-29 23:12:51 +00:00
t . Fatal ( err )
}
s . Restart ( )
// Verify that the node exists.
2014-12-30 15:50:15 +00:00
if n := s . DataNodeByURL ( u ) ; n == nil {
t . Fatalf ( "data node not found" )
2014-12-29 23:12:51 +00:00
} else if n . URL . String ( ) != "http://localhost:80000" {
t . Fatalf ( "unexpected url: %s" , n . URL )
} else if n . ID == 0 {
t . Fatalf ( "unexpected id: %d" , n . ID )
}
}
// Ensure the server returns an error when creating a duplicate node.
2014-12-30 15:50:15 +00:00
func TestServer_CreateDatabase_ErrDataNodeExists ( t * testing . T ) {
2014-12-29 23:12:51 +00:00
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Create a node with the same URL twice.
u , _ := url . Parse ( "http://localhost:80000" )
2014-12-30 15:50:15 +00:00
if err := s . CreateDataNode ( u ) ; err != nil {
2014-12-29 23:12:51 +00:00
t . Fatal ( err )
}
2014-12-30 15:50:15 +00:00
if err := s . CreateDataNode ( u ) ; err != influxdb . ErrDataNodeExists {
2014-12-29 23:12:51 +00:00
t . Fatal ( err )
}
}
// Ensure the server can delete a node.
2014-12-30 15:50:15 +00:00
func TestServer_DeleteDataNode ( t * testing . T ) {
2014-12-29 23:12:51 +00:00
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
2014-12-30 15:50:15 +00:00
// Create a data node and verify it exists.
2014-12-29 23:12:51 +00:00
u , _ := url . Parse ( "http://localhost:80000" )
2014-12-30 15:50:15 +00:00
if err := s . CreateDataNode ( u ) ; err != nil {
2014-12-29 23:12:51 +00:00
t . Fatal ( err )
2014-12-30 15:50:15 +00:00
} else if s . DataNodeByURL ( u ) == nil {
t . Fatalf ( "data node not actually created" )
2014-12-29 23:12:51 +00:00
}
s . Restart ( )
// Drop the node and verify that it's gone.
2014-12-30 15:50:15 +00:00
n := s . DataNodeByURL ( u )
if err := s . DeleteDataNode ( n . ID ) ; err != nil {
2014-12-29 23:12:51 +00:00
t . Fatal ( err )
2014-12-30 15:50:15 +00:00
} else if s . DataNode ( n . ID ) != nil {
t . Fatalf ( "data node not actually dropped" )
2014-12-29 23:12:51 +00:00
}
}
2015-02-06 17:03:34 +00:00
// Test unuathorized requests logging
func TestServer_UnauthorizedRequests ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . SetAuthenticationEnabled ( true )
var b bytes . Buffer
s . SetLogOutput ( & b )
adminOnlyQuery := & influxql . Query {
Statements : [ ] influxql . Statement {
& influxql . DropDatabaseStatement { Name : "foo" } ,
} ,
}
e := s . Authorize ( nil , adminOnlyQuery , "foo" )
if _ , ok := e . ( influxdb . ErrAuthorize ) ; ! ok {
t . Fatalf ( "unexpected error. expected %v, actual: %v" , influxdb . ErrAuthorize { } , e )
}
if ! strings . Contains ( b . String ( ) , "unauthorized request" ) {
t . Log ( b . String ( ) )
t . Fatalf ( ` log should contain "unuathorized request" ` )
}
b . Reset ( )
// Create normal database user.
s . CreateUser ( "user1" , "user1" , false )
user1 := s . User ( "user1" )
e = s . Authorize ( user1 , adminOnlyQuery , "foo" )
if _ , ok := e . ( influxdb . ErrAuthorize ) ; ! ok {
t . Fatalf ( "unexpected error. expected %v, actual: %v" , influxdb . ErrAuthorize { } , e )
}
if ! strings . Contains ( b . String ( ) , "unauthorized request" ) {
t . Log ( b . String ( ) )
t . Fatalf ( ` log should contain "unuathorized request" ` )
}
}
2015-01-21 04:45:18 +00:00
// Test user privilege authorization.
func TestServer_UserPrivilegeAuthorization ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Create cluster admin.
s . CreateUser ( "admin" , "admin" , true )
admin := s . User ( "admin" )
// Create normal database user.
s . CreateUser ( "user1" , "user1" , false )
user1 := s . User ( "user1" )
user1 . Privileges [ "foo" ] = influxql . ReadPrivilege
s . Restart ( )
// admin user should be authorized for all privileges.
if ! admin . Authorize ( influxql . AllPrivileges , "" ) {
t . Fatalf ( "cluster admin doesn't have influxql.AllPrivileges" )
} else if ! admin . Authorize ( influxql . WritePrivilege , "" ) {
t . Fatalf ( "cluster admin doesn't have influxql.WritePrivilege" )
}
// Normal user with only read privilege on database foo.
if ! user1 . Authorize ( influxql . ReadPrivilege , "foo" ) {
t . Fatalf ( "user1 doesn't have influxql.ReadPrivilege on foo" )
} else if user1 . Authorize ( influxql . WritePrivilege , "foo" ) {
t . Fatalf ( "user1 has influxql.WritePrivilege on foo" )
} else if user1 . Authorize ( influxql . ReadPrivilege , "bar" ) {
t . Fatalf ( "user1 has influxql.ReadPrivilege on bar" )
} else if user1 . Authorize ( influxql . AllPrivileges , "" ) {
t . Fatalf ( "user1 is cluster admin" )
}
}
// Test single statement query authorization.
func TestServer_SingleStatementQueryAuthorization ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Create cluster admin.
s . CreateUser ( "admin" , "admin" , true )
admin := s . User ( "admin" )
// Create normal database user.
s . CreateUser ( "user" , "user" , false )
user := s . User ( "user" )
user . Privileges [ "foo" ] = influxql . ReadPrivilege
s . Restart ( )
// Create a query that only cluster admins can run.
adminOnlyQuery := & influxql . Query {
Statements : [ ] influxql . Statement {
& influxql . DropDatabaseStatement { Name : "foo" } ,
} ,
}
// Create a query that requires read on one db and write on another.
readWriteQuery := & influxql . Query {
Statements : [ ] influxql . Statement {
& influxql . CreateContinuousQueryStatement {
Name : "myquery" ,
Database : "foo" ,
Source : & influxql . SelectStatement {
2015-02-01 20:33:12 +00:00
Fields : [ ] * influxql . Field { { Expr : & influxql . Call { Name : "count" } } } ,
2015-01-21 04:45:18 +00:00
Target : & influxql . Target { Measurement : "measure1" , Database : "bar" } ,
Source : & influxql . Measurement { Name : "myseries" } ,
} ,
} ,
} ,
}
// admin user should be authorized to execute any query.
2015-02-06 16:16:45 +00:00
if err := s . Authorize ( admin , adminOnlyQuery , "" ) ; err != nil {
2015-01-21 04:45:18 +00:00
t . Fatal ( err )
}
2015-02-06 16:16:45 +00:00
if err := s . Authorize ( admin , readWriteQuery , "foo" ) ; err != nil {
2015-01-21 04:45:18 +00:00
t . Fatal ( err )
}
// Normal user should not be authorized to execute admin only query.
2015-02-06 16:16:45 +00:00
if err := s . Authorize ( user , adminOnlyQuery , "" ) ; err == nil {
2015-01-21 04:45:18 +00:00
t . Fatalf ( "normal user should not be authorized to execute cluster admin level queries" )
}
// Normal user should not be authorized to execute query that selects into another
// database which (s)he doesn't have privileges on.
2015-02-06 16:16:45 +00:00
if err := s . Authorize ( user , readWriteQuery , "" ) ; err == nil {
2015-01-21 04:45:18 +00:00
t . Fatalf ( "normal user should not be authorized to write to database bar" )
}
// Grant normal user write privileges on database "bar".
user . Privileges [ "bar" ] = influxql . WritePrivilege
//Authorization on the previous query should now succeed.
2015-02-06 16:16:45 +00:00
if err := s . Authorize ( user , readWriteQuery , "" ) ; err != nil {
2015-01-21 04:45:18 +00:00
t . Fatal ( err )
}
}
// Test multiple statement query authorization.
func TestServer_MultiStatementQueryAuthorization ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Create cluster admin.
s . CreateUser ( "admin" , "admin" , true )
admin := s . User ( "admin" )
// Create normal database user.
s . CreateUser ( "user" , "user" , false )
user := s . User ( "user" )
user . Privileges [ "foo" ] = influxql . ReadPrivilege
s . Restart ( )
// Create a query that requires read for one statement and write for the second.
readWriteQuery := & influxql . Query {
Statements : [ ] influxql . Statement {
// Statement that requires read.
& influxql . SelectStatement {
2015-02-01 20:33:12 +00:00
Fields : [ ] * influxql . Field { { Expr : & influxql . Call { Name : "count" } } } ,
2015-01-21 04:45:18 +00:00
Source : & influxql . Measurement { Name : "cpu" } ,
} ,
// Statement that requires write.
& influxql . SelectStatement {
2015-02-01 20:33:12 +00:00
Fields : [ ] * influxql . Field { { Expr : & influxql . Call { Name : "count" } } } ,
2015-01-21 04:45:18 +00:00
Source : & influxql . Measurement { Name : "cpu" } ,
Target : & influxql . Target { Measurement : "tmp" } ,
} ,
} ,
}
// Admin should be authorized to execute both statements in the query.
2015-02-06 16:16:45 +00:00
if err := s . Authorize ( admin , readWriteQuery , "foo" ) ; err != nil {
2015-01-21 04:45:18 +00:00
t . Fatal ( err )
}
// Normal user with only read privileges should not be authorized to execute both statements.
2015-02-06 16:16:45 +00:00
if err := s . Authorize ( user , readWriteQuery , "foo" ) ; err == nil {
2015-01-21 04:45:18 +00:00
t . Fatalf ( "user should not be authorized to execute both statements" )
}
}
2014-10-24 05:38:03 +00:00
// Ensure the server can create a database.
func TestServer_CreateDatabase ( t * testing . T ) {
2014-10-24 23:45:02 +00:00
s := OpenServer ( NewMessagingClient ( ) )
2014-10-24 05:38:03 +00:00
defer s . Close ( )
// Create the "foo" database.
if err := s . CreateDatabase ( "foo" ) ; err != nil {
t . Fatal ( err )
}
2014-11-05 05:32:17 +00:00
s . Restart ( )
2014-10-24 05:38:03 +00:00
// Verify that the database exists.
2014-12-23 06:18:05 +00:00
if ! s . DatabaseExists ( "foo" ) {
2014-10-24 05:38:03 +00:00
t . Fatalf ( "database not found" )
}
}
2014-10-24 23:45:02 +00:00
// Ensure the server returns an error when creating a duplicate database.
func TestServer_CreateDatabase_ErrDatabaseExists ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Create the "foo" database twice.
if err := s . CreateDatabase ( "foo" ) ; err != nil {
t . Fatal ( err )
}
if err := s . CreateDatabase ( "foo" ) ; err != influxdb . ErrDatabaseExists {
t . Fatal ( err )
}
}
// Ensure the server can drop a database.
func TestServer_DropDatabase ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Create the "foo" database and verify it exists.
if err := s . CreateDatabase ( "foo" ) ; err != nil {
t . Fatal ( err )
2014-12-23 06:18:05 +00:00
} else if ! s . DatabaseExists ( "foo" ) {
2014-10-24 23:45:02 +00:00
t . Fatalf ( "database not actually created" )
}
2014-11-05 05:32:17 +00:00
s . Restart ( )
2014-10-24 23:45:02 +00:00
// Drop the "foo" database and verify that it's gone.
2015-02-23 18:46:08 +00:00
if err := s . DropDatabase ( "foo" ) ; err != nil {
2014-10-24 23:45:02 +00:00
t . Fatal ( err )
2014-12-23 06:18:05 +00:00
} else if s . DatabaseExists ( "foo" ) {
2014-10-24 23:45:02 +00:00
t . Fatalf ( "database not actually dropped" )
}
}
// Ensure the server returns an error when dropping a database that doesn't exist.
func TestServer_DropDatabase_ErrDatabaseNotFound ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Drop a database that doesn't exist.
2015-02-23 18:46:08 +00:00
if err := s . DropDatabase ( "no_such_db" ) ; err != influxdb . ErrDatabaseNotFound {
2014-10-24 23:45:02 +00:00
t . Fatal ( err )
}
}
2014-10-28 00:16:03 +00:00
// Ensure the server can return a list of all databases.
func TestServer_Databases ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Create some databases.
s . CreateDatabase ( "foo" )
s . CreateDatabase ( "bar" )
2014-11-05 05:32:17 +00:00
s . Restart ( )
2014-10-28 00:16:03 +00:00
// Return the databases.
if a := s . Databases ( ) ; len ( a ) != 2 {
t . Fatalf ( "unexpected db count: %d" , len ( a ) )
2014-12-23 06:18:05 +00:00
} else if a [ 0 ] != "bar" {
t . Fatalf ( "unexpected db(0): %s" , a [ 0 ] )
} else if a [ 1 ] != "foo" {
t . Fatalf ( "unexpected db(1): %s" , a [ 1 ] )
2014-10-28 00:16:03 +00:00
}
}
2014-12-23 06:18:05 +00:00
// Ensure the server can create a new user.
func TestServer_CreateUser ( t * testing . T ) {
2014-10-28 23:54:49 +00:00
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
2014-12-23 06:18:05 +00:00
// Create a user.
if err := s . CreateUser ( "susy" , "pass" , true ) ; err != nil {
2014-10-28 23:54:49 +00:00
t . Fatal ( err )
}
2014-11-05 05:32:17 +00:00
s . Restart ( )
2014-10-28 23:54:49 +00:00
2014-12-23 06:18:05 +00:00
// Verify that the user exists.
if u := s . User ( "susy" ) ; u == nil {
t . Fatalf ( "user not found" )
2014-10-28 23:54:49 +00:00
} else if u . Name != "susy" {
t . Fatalf ( "username mismatch: %v" , u . Name )
2014-12-23 06:18:05 +00:00
} else if ! u . Admin {
t . Fatalf ( "admin mismatch: %v" , u . Admin )
2014-10-28 23:54:49 +00:00
} else if bcrypt . CompareHashAndPassword ( [ ] byte ( u . Hash ) , [ ] byte ( "pass" ) ) != nil {
t . Fatal ( "invalid password" )
}
2015-01-05 06:23:18 +00:00
// Verify that the authenticated user exists.
2015-01-06 17:19:31 +00:00
u , err := s . Authenticate ( "susy" , "pass" )
2015-01-05 06:23:18 +00:00
if err != nil {
t . Fatalf ( "error fetching authenticated user" )
} else if u . Name != "susy" {
t . Fatalf ( "username mismatch: %v" , u . Name )
} else if ! u . Admin {
t . Fatalf ( "admin mismatch: %v" , u . Admin )
} else if bcrypt . CompareHashAndPassword ( [ ] byte ( u . Hash ) , [ ] byte ( "pass" ) ) != nil {
t . Fatal ( "invalid password" )
}
2014-10-28 23:54:49 +00:00
}
2015-01-02 21:19:32 +00:00
// Ensure the server correctly detects when there is an admin user.
func TestServer_AdminUserExists ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// A server should start up without any admin user.
if s . AdminUserExists ( ) {
t . Fatalf ( "admin user unexpectedly exists at start-up" )
}
// Create a non-admin user and verify Server agrees there is no admin user.
if err := s . CreateUser ( "bert" , "pass" , false ) ; err != nil {
t . Fatal ( err )
}
s . Restart ( )
if s . AdminUserExists ( ) {
t . Fatalf ( "admin user unexpectedly exists" )
}
// Next, create an admin user, and ensure the Server agrees there is an admin user.
if err := s . CreateUser ( "ernie" , "pass" , true ) ; err != nil {
t . Fatal ( err )
}
s . Restart ( )
if ! s . AdminUserExists ( ) {
t . Fatalf ( "admin user does not exist" )
}
2014-10-28 23:54:49 +00:00
}
2014-12-23 06:18:05 +00:00
// Ensure the server returns an error when creating an user without a name.
func TestServer_CreateUser_ErrUsernameRequired ( t * testing . T ) {
2014-10-29 00:43:03 +00:00
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
2014-12-23 06:18:05 +00:00
if err := s . CreateUser ( "" , "pass" , false ) ; err != influxdb . ErrUsernameRequired {
2014-10-29 00:43:03 +00:00
t . Fatal ( err )
}
}
2014-12-23 06:18:05 +00:00
// Ensure the server returns an error when creating a duplicate user.
func TestServer_CreateUser_ErrUserExists ( t * testing . T ) {
2014-10-29 00:43:03 +00:00
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
2014-12-23 06:18:05 +00:00
if err := s . CreateUser ( "susy" , "pass" , false ) ; err != nil {
2014-10-29 00:43:03 +00:00
t . Fatal ( err )
}
2014-12-23 06:18:05 +00:00
if err := s . CreateUser ( "susy" , "pass" , false ) ; err != influxdb . ErrUserExists {
2014-10-29 00:43:03 +00:00
t . Fatal ( err )
}
}
2014-12-23 06:18:05 +00:00
// Ensure the server can delete an existing user.
func TestServer_DeleteUser ( t * testing . T ) {
2014-10-29 00:43:03 +00:00
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
2014-12-23 06:18:05 +00:00
// Create a user.
if err := s . CreateUser ( "susy" , "pass" , false ) ; err != nil {
2014-10-29 00:43:03 +00:00
t . Fatal ( err )
2014-12-23 06:18:05 +00:00
} else if s . User ( "susy" ) == nil {
t . Fatalf ( "user not created" )
2014-10-29 00:43:03 +00:00
}
2014-12-23 06:18:05 +00:00
// Delete the user.
if err := s . DeleteUser ( "susy" ) ; err != nil {
2014-10-29 00:43:03 +00:00
t . Fatal ( err )
2014-12-23 06:18:05 +00:00
} else if s . User ( "susy" ) != nil {
t . Fatalf ( "user not actually deleted" )
2014-10-29 00:43:03 +00:00
}
2014-11-05 05:32:17 +00:00
s . Restart ( )
2014-12-23 06:18:05 +00:00
if s . User ( "susy" ) != nil {
t . Fatalf ( "user not actually deleted after restart" )
2014-11-05 05:32:17 +00:00
}
2014-10-29 00:43:03 +00:00
}
2014-12-23 06:18:05 +00:00
// Ensure the server can return a list of all users.
func TestServer_Users ( t * testing . T ) {
2014-10-29 01:34:12 +00:00
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
2015-01-05 06:11:37 +00:00
// Create some users.
2014-12-23 06:18:05 +00:00
s . CreateUser ( "susy" , "pass" , false )
s . CreateUser ( "john" , "pass" , false )
2014-11-05 05:32:17 +00:00
s . Restart ( )
2014-10-29 01:34:12 +00:00
2015-01-05 06:11:37 +00:00
// Return the users.
2014-12-23 06:18:05 +00:00
if a := s . Users ( ) ; len ( a ) != 2 {
t . Fatalf ( "unexpected user count: %d" , len ( a ) )
2014-10-29 01:34:12 +00:00
} else if a [ 0 ] . Name != "john" {
2014-12-23 06:18:05 +00:00
t . Fatalf ( "unexpected user(0): %s" , a [ 0 ] . Name )
2014-10-29 01:34:12 +00:00
} else if a [ 1 ] . Name != "susy" {
2014-12-23 06:18:05 +00:00
t . Fatalf ( "unexpected user(1): %s" , a [ 1 ] . Name )
2014-10-29 01:34:12 +00:00
}
}
2015-01-05 06:23:18 +00:00
// Ensure the server does not return non-existent users
func TestServer_NonExistingUsers ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Create some users.
s . CreateUser ( "susy" , "pass" , false )
s . CreateUser ( "john" , "pass2" , false )
s . Restart ( )
// Ask for users that should not be returned.
u := s . User ( "bob" )
if u != nil {
t . Fatalf ( "unexpected user found" )
}
2015-01-06 17:19:31 +00:00
u , err := s . Authenticate ( "susy" , "wrong_password" )
2015-01-05 06:23:18 +00:00
if err == nil {
t . Fatalf ( "unexpected authenticated user found" )
}
}
2014-12-23 06:18:05 +00:00
// Ensure the database can create a new retention policy.
func TestServer_CreateRetentionPolicy ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Create a database.
if err := s . CreateDatabase ( "foo" ) ; err != nil {
t . Fatal ( err )
}
// Create a retention policy on the database.
rp := & influxdb . RetentionPolicy {
Name : "bar" ,
Duration : time . Hour ,
ReplicaN : 2 ,
}
if err := s . CreateRetentionPolicy ( "foo" , rp ) ; err != nil {
t . Fatal ( err )
}
s . Restart ( )
// Verify that the policy exists.
if o , err := s . RetentionPolicy ( "foo" , "bar" ) ; err != nil {
t . Fatalf ( "unexpected error: %s" , err )
} else if o == nil {
t . Fatalf ( "retention policy not found" )
} else if ! reflect . DeepEqual ( rp , o ) {
t . Fatalf ( "retention policy mismatch: %#v" , o )
}
}
// Ensure the server returns an error when creating a retention policy with an invalid db.
func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
if err := s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "bar" } ) ; err != influxdb . ErrDatabaseNotFound {
t . Fatal ( err )
}
}
// Ensure the server returns an error when creating a retention policy without a name.
func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "foo" )
if err := s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "" } ) ; err != influxdb . ErrRetentionPolicyNameRequired {
t . Fatal ( err )
}
}
// Ensure the server returns an error when creating a duplicate retention policy.
func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "foo" )
s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "bar" } )
if err := s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "bar" } ) ; err != influxdb . ErrRetentionPolicyExists {
t . Fatal ( err )
}
}
2015-02-05 17:54:06 +00:00
// Ensure the database can alter an existing retention policy.
func TestServer_AlterRetentionPolicy ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Create a database.
if err := s . CreateDatabase ( "foo" ) ; err != nil {
t . Fatal ( err )
}
// Create a retention policy on the database.
rp := & influxdb . RetentionPolicy {
Name : "bar" ,
Duration : time . Hour ,
ReplicaN : 2 ,
}
if err := s . CreateRetentionPolicy ( "foo" , rp ) ; err != nil {
t . Fatal ( err )
}
// Alter the retention policy.
duration := time . Minute
replicaN := uint32 ( 3 )
rp2 := & influxdb . RetentionPolicyUpdate {
Duration : & duration ,
ReplicaN : & replicaN ,
}
if err := s . UpdateRetentionPolicy ( "foo" , "bar" , rp2 ) ; err != nil {
t . Fatal ( err )
}
// Restart the server to make sure the changes persist afterwards.
s . Restart ( )
// Verify that the policy exists.
if o , err := s . RetentionPolicy ( "foo" , "bar" ) ; err != nil {
t . Fatalf ( "unexpected error: %s" , err )
} else if o == nil {
t . Fatalf ( "retention policy not found" )
} else if o . Duration != * rp2 . Duration {
t . Fatalf ( "retention policy mismatch:\n\texp Duration = %s\n\tgot Duration = %s\n" , rp2 . Duration , o . Duration )
} else if o . ReplicaN != * rp2 . ReplicaN {
t . Fatalf ( "retention policy mismatch:\n\texp ReplicaN = %d\n\tgot ReplicaN = %d\n" , rp2 . ReplicaN , o . ReplicaN )
}
}
2014-12-23 06:18:05 +00:00
// Ensure the server can delete an existing retention policy.
func TestServer_DeleteRetentionPolicy ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Create a database and retention policy.
s . CreateDatabase ( "foo" )
if err := s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "bar" } ) ; err != nil {
t . Fatal ( err )
} else if rp , _ := s . RetentionPolicy ( "foo" , "bar" ) ; rp == nil {
t . Fatal ( "retention policy not created" )
}
// Remove retention policy from database.
if err := s . DeleteRetentionPolicy ( "foo" , "bar" ) ; err != nil {
t . Fatal ( err )
} else if rp , _ := s . RetentionPolicy ( "foo" , "bar" ) ; rp != nil {
t . Fatal ( "retention policy not deleted" )
}
s . Restart ( )
if rp , _ := s . RetentionPolicy ( "foo" , "bar" ) ; rp != nil {
t . Fatal ( "retention policy not deleted after restart" )
}
}
// Ensure the server returns an error when deleting a retention policy on invalid db.
func TestServer_DeleteRetentionPolicy_ErrDatabaseNotFound ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
if err := s . DeleteRetentionPolicy ( "foo" , "bar" ) ; err != influxdb . ErrDatabaseNotFound {
t . Fatal ( err )
}
}
// Ensure the server returns an error when deleting a retention policy without a name.
func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNameRequired ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "foo" )
if err := s . DeleteRetentionPolicy ( "foo" , "" ) ; err != influxdb . ErrRetentionPolicyNameRequired {
t . Fatal ( err )
}
}
// Ensure the server returns an error when deleting a non-existent retention policy.
func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNotFound ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "foo" )
if err := s . DeleteRetentionPolicy ( "foo" , "no_such_policy" ) ; err != influxdb . ErrRetentionPolicyNotFound {
t . Fatal ( err )
}
}
// Ensure the server can set the default retention policy
func TestServer_SetDefaultRetentionPolicy ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "foo" )
rp := & influxdb . RetentionPolicy { Name : "bar" }
if err := s . CreateRetentionPolicy ( "foo" , rp ) ; err != nil {
t . Fatal ( err )
} else if rp , _ := s . RetentionPolicy ( "foo" , "bar" ) ; rp == nil {
t . Fatal ( "retention policy not created" )
}
// Set bar as default
if err := s . SetDefaultRetentionPolicy ( "foo" , "bar" ) ; err != nil {
t . Fatal ( err )
}
if o , _ := s . DefaultRetentionPolicy ( "foo" ) ; o == nil {
t . Fatal ( "default policy not set" )
} else if ! reflect . DeepEqual ( rp , o ) {
t . Fatalf ( "retention policy mismatch: %#v" , o )
}
s . Restart ( )
if o , _ := s . DefaultRetentionPolicy ( "foo" ) ; o == nil {
t . Fatal ( "default policy not kept after restart" )
} else if ! reflect . DeepEqual ( rp , o ) {
t . Fatalf ( "retention policy mismatch after restart: %#v" , o )
}
}
2015-02-10 21:46:49 +00:00
// Ensure the server returns an error when setting the default retention policy to a non-existant one.
2014-12-23 06:18:05 +00:00
func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "foo" )
if err := s . SetDefaultRetentionPolicy ( "foo" , "no_such_policy" ) ; err != influxdb . ErrRetentionPolicyNotFound {
t . Fatal ( err )
}
}
2015-02-10 21:41:55 +00:00
// Ensure the server prohibits a zero check interval for retention policy enforcement.
2015-02-11 01:32:19 +00:00
func TestServer_StartRetentionPolicyEnforcement_ErrZeroInterval ( t * testing . T ) {
2015-02-10 21:41:55 +00:00
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
2015-02-11 01:17:28 +00:00
if err := s . StartRetentionPolicyEnforcement ( time . Duration ( 0 ) ) ; err == nil {
2015-02-10 21:41:55 +00:00
t . Fatal ( "failed to prohibit retention policies zero check interval" )
}
}
2015-02-11 01:32:19 +00:00
func TestServer_EnforceRetentionPolices ( t * testing . T ) {
c := NewMessagingClient ( )
s := OpenServer ( c )
defer s . Close ( )
s . CreateDatabase ( "foo" )
s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "mypolicy" , Duration : 30 * time . Minute } )
// Create two shard groups for the the new retention policy -- 1 which will age out immediately
// the other in more than an hour.
s . CreateShardGroupIfNotExists ( "foo" , "mypolicy" , time . Now ( ) . Add ( - 1 * time . Hour ) )
s . CreateShardGroupIfNotExists ( "foo" , "mypolicy" , time . Now ( ) . Add ( time . Hour ) )
// Check the two shard groups exist.
var g [ ] * influxdb . ShardGroup
g , err := s . ShardGroups ( "foo" )
if err != nil {
t . Fatal ( err )
} else if len ( g ) != 2 {
t . Fatalf ( "expected 2 shard group but found %d" , len ( g ) )
}
// Run retention enforcement.
s . EnforceRetentionPolicies ( )
// Ensure enforcement is in effect across restarts.
s . Restart ( )
// First shard group should have been removed.
g , err = s . ShardGroups ( "foo" )
if err != nil {
t . Fatal ( err )
} else if len ( g ) != 1 {
t . Fatalf ( "expected 1 shard group but found %d" , len ( g ) )
}
}
2014-12-23 06:18:05 +00:00
// Ensure the database can write data to the database.
func TestServer_WriteSeries ( t * testing . T ) {
2015-01-10 15:48:50 +00:00
c := NewMessagingClient ( )
s := OpenServer ( c )
2014-12-23 06:18:05 +00:00
defer s . Close ( )
s . CreateDatabase ( "foo" )
2015-01-12 20:10:26 +00:00
s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "mypolicy" , Duration : 1 * time . Hour } )
2014-12-23 06:18:05 +00:00
s . CreateUser ( "susy" , "pass" , false )
2015-01-10 15:48:50 +00:00
// Check if a topic is being subscribed to.
var subscribed bool
c . SubscribeFunc = func ( replicaID , topicID uint64 ) error {
subscribed = true
return nil
}
2014-12-23 06:18:05 +00:00
// Write series with one point to the database.
tags := map [ string ] string { "host" : "servera.influx.com" , "region" : "uswest" }
2015-02-23 22:37:10 +00:00
index , err := s . WriteSeries ( "foo" , "mypolicy" , [ ] influxdb . Point { { Name : "cpu_load" , Tags : tags , Timestamp : mustParseTime ( "2000-01-01T00:00:00Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 23.2 ) } } } )
2015-01-13 17:16:43 +00:00
if err != nil {
2015-01-10 20:22:57 +00:00
t . Fatal ( err )
2015-01-13 17:16:43 +00:00
} else if err = s . Sync ( index ) ; err != nil {
t . Fatalf ( "sync error: %s" , err )
2015-01-10 20:22:57 +00:00
}
// Write another point 10 seconds later so it goes through "raw series".
2015-02-23 22:37:10 +00:00
index , err = s . WriteSeries ( "foo" , "mypolicy" , [ ] influxdb . Point { { Name : "cpu_load" , Tags : tags , Timestamp : mustParseTime ( "2000-01-01T00:00:10Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 100 ) } } } )
2015-01-13 17:16:43 +00:00
if err != nil {
2014-12-23 06:18:05 +00:00
t . Fatal ( err )
2015-01-13 17:16:43 +00:00
} else if err = s . Sync ( index ) ; err != nil {
t . Fatalf ( "sync error: %s" , err )
2014-12-23 06:18:05 +00:00
}
2015-01-10 15:48:50 +00:00
// Verify a subscription was made.
if ! subscribed {
t . Fatal ( "expected subscription" )
}
2014-12-24 00:01:06 +00:00
2015-01-13 17:16:43 +00:00
// Retrieve first series data point.
2015-01-12 20:10:26 +00:00
if v , err := s . ReadSeries ( "foo" , "mypolicy" , "cpu_load" , tags , mustParseTime ( "2000-01-01T00:00:00Z" ) ) ; err != nil {
2015-01-10 15:48:50 +00:00
t . Fatal ( err )
2015-01-14 23:44:09 +00:00
} else if ! reflect . DeepEqual ( v , map [ string ] interface { } { "value" : float64 ( 23.2 ) } ) {
2015-01-10 15:48:50 +00:00
t . Fatalf ( "values mismatch: %#v" , v )
}
2015-01-13 17:16:43 +00:00
// Retrieve second series data point.
if v , err := s . ReadSeries ( "foo" , "mypolicy" , "cpu_load" , tags , mustParseTime ( "2000-01-01T00:00:10Z" ) ) ; err != nil {
t . Fatal ( err )
2015-01-14 23:44:09 +00:00
} else if mustMarshalJSON ( v ) != mustMarshalJSON ( map [ string ] interface { } { "value" : float64 ( 100 ) } ) {
2015-01-10 15:48:50 +00:00
t . Fatalf ( "values mismatch: %#v" , v )
2015-01-13 17:16:43 +00:00
}
// Retrieve non-existent series data point.
if v , err := s . ReadSeries ( "foo" , "mypolicy" , "cpu_load" , tags , mustParseTime ( "2000-01-01T00:01:00Z" ) ) ; err != nil {
t . Fatal ( err )
} else if v != nil {
t . Fatalf ( "expected nil values: %#v" , v )
}
2014-12-23 06:18:05 +00:00
}
2015-02-24 21:28:20 +00:00
// Ensure the server can drop a measurement.
func TestServer_DropMeasurement ( t * testing . T ) {
c := NewMessagingClient ( )
s := OpenServer ( c )
defer s . Close ( )
s . CreateDatabase ( "foo" )
s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "raw" , Duration : 1 * time . Hour } )
s . SetDefaultRetentionPolicy ( "foo" , "raw" )
s . CreateUser ( "susy" , "pass" , false )
// Write series with one point to the database.
tags := map [ string ] string { "host" : "serverA" , "region" : "uswest" }
index , err := s . WriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : tags , Timestamp : mustParseTime ( "2000-01-01T00:00:00Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 23.2 ) } } } )
if err != nil {
t . Fatal ( err )
} else if err = s . Sync ( index ) ; err != nil {
t . Fatalf ( "sync error: %s" , err )
}
// Ensure measurement exists
results := s . ExecuteQuery ( MustParseQuery ( ` SHOW MEASUREMENTS ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"measurements","columns":["name"],"values":[["cpu"]]}]} ` {
t . Fatalf ( "unexpected row(0): %s" , s )
}
// Ensure series exists
results = s . ExecuteQuery ( MustParseQuery ( ` SHOW SERIES ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","columns":["host","region"],"values":[["serverA","uswest"]]}]} ` {
t . Fatalf ( "unexpected row(0): %s" , s )
}
// Drop measurement
results = s . ExecuteQuery ( MustParseQuery ( ` DROP MEASUREMENT cpu ` ) , "foo" , nil )
if results . Error ( ) != nil {
t . Fatalf ( "unexpected error: %s" , results . Error ( ) )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SHOW MEASUREMENTS ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
} else if len ( res . Series ) != 0 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { } ` {
t . Fatalf ( "unexpected row(0): %s" , s )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SHOW SERIES ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
} else if len ( res . Series ) != 0 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { } ` {
t . Fatalf ( "unexpected row(0): %s" , s )
}
2015-02-25 00:18:10 +00:00
}
// Ensure Drop measurement can:
// write to measurement cpu with tags region=uswest host=serverA
// write to measurement memory with tags region=uswest host=serverB
// drop one of those measurements
// ensure that the dropped measurement is gone
// ensure that we can still query: show measurements
// ensure that we can still make various queries:
// select * from memory where region=uswest and host=serverb
// select * from memory where host=serverb
// select * from memory where region=uswest
func TestServer_DropMeasurementSeriesTagsPreserved ( t * testing . T ) {
c := NewMessagingClient ( )
s := OpenServer ( c )
defer s . Close ( )
s . CreateDatabase ( "foo" )
s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "raw" , Duration : 1 * time . Hour } )
s . SetDefaultRetentionPolicy ( "foo" , "raw" )
s . CreateUser ( "susy" , "pass" , false )
// Write series with one point to the database.
tags := map [ string ] string { "host" : "serverA" , "region" : "uswest" }
index , err := s . WriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : tags , Timestamp : mustParseTime ( "2000-01-01T00:00:00Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 23.2 ) } } } )
if err != nil {
t . Fatal ( err )
} else if err = s . Sync ( index ) ; err != nil {
t . Fatalf ( "sync error: %s" , err )
}
tags = map [ string ] string { "host" : "serverB" , "region" : "uswest" }
index , err = s . WriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "memory" , Tags : tags , Timestamp : mustParseTime ( "2000-01-01T00:00:01Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 33.2 ) } } } )
if err != nil {
t . Fatal ( err )
} else if err = s . Sync ( index ) ; err != nil {
t . Fatalf ( "sync error: %s" , err )
}
2015-02-24 21:28:20 +00:00
2015-02-25 00:18:10 +00:00
// Ensure measurement exists
results := s . ExecuteQuery ( MustParseQuery ( ` SHOW MEASUREMENTS ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"measurements","columns":["name"],"values":[["cpu"],["memory"]]}]} ` {
t . Fatalf ( "unexpected row(0): %s" , s )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SHOW SERIES ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
} else if len ( res . Series ) != 2 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","columns":["host","region"],"values":[["serverA","uswest"]]}, { "name":"memory","columns":["host","region"],"values":[["serverB","uswest"]]}]} ` {
t . Fatalf ( "unexpected row(0): %s" , s )
}
// Ensure we can query for memory with both tags
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT * FROM memory where region='uswest' and host='serverB' ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"memory","columns":["time","value"],"values":[["2000-01-01T00:00:01Z",33.2]]}]} ` {
t . Fatalf ( "unexpected row(0): %s" , s )
}
// Drop measurement
results = s . ExecuteQuery ( MustParseQuery ( ` DROP MEASUREMENT cpu ` ) , "foo" , nil )
if results . Error ( ) != nil {
t . Fatalf ( "unexpected error: %s" , results . Error ( ) )
}
// Ensure measurement exists
results = s . ExecuteQuery ( MustParseQuery ( ` SHOW MEASUREMENTS ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"measurements","columns":["name"],"values":[["memory"]]}]} ` {
t . Fatalf ( "unexpected row(0): %s" , s )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SHOW SERIES ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"memory","columns":["host","region"],"values":[["serverB","uswest"]]}]} ` {
t . Fatalf ( "unexpected row(0): %s" , s )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT * FROM cpu ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err . Error ( ) != ` measurement "foo"."raw"."cpu" does not exist. ` {
t . Fatalf ( "unexpected error: %s" , res . Err )
} else if len ( res . Series ) != 0 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT * FROM memory where host='serverB' ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"memory","columns":["time","value"],"values":[["2000-01-01T00:00:01Z",33.2]]}]} ` {
t . Fatalf ( "unexpected row(0): %s" , s )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT * FROM memory where region='uswest' ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"memory","columns":["time","value"],"values":[["2000-01-01T00:00:01Z",33.2]]}]} ` {
t . Fatalf ( "unexpected row(0): %s" , s )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT * FROM memory where region='uswest' and host='serverB' ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"memory","columns":["time","value"],"values":[["2000-01-01T00:00:01Z",33.2]]}]} ` {
t . Fatalf ( "unexpected row(0): %s" , s )
}
2015-02-24 21:28:20 +00:00
}
2015-02-20 20:30:52 +00:00
// Ensure the server can drop a series.
func TestServer_DropSeries ( t * testing . T ) {
c := NewMessagingClient ( )
s := OpenServer ( c )
defer s . Close ( )
s . CreateDatabase ( "foo" )
2015-02-22 03:21:02 +00:00
s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "raw" , Duration : 1 * time . Hour } )
s . SetDefaultRetentionPolicy ( "foo" , "raw" )
2015-02-20 20:30:52 +00:00
s . CreateUser ( "susy" , "pass" , false )
// Write series with one point to the database.
2015-02-22 03:21:02 +00:00
tags := map [ string ] string { "host" : "serverA" , "region" : "uswest" }
2015-02-23 22:37:10 +00:00
index , err := s . WriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : tags , Timestamp : mustParseTime ( "2000-01-01T00:00:00Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 23.2 ) } } } )
2015-02-20 20:30:52 +00:00
if err != nil {
t . Fatal ( err )
} else if err = s . Sync ( index ) ; err != nil {
t . Fatalf ( "sync error: %s" , err )
}
2015-02-24 21:28:20 +00:00
// Ensure series exists
2015-02-22 03:21:02 +00:00
results := s . ExecuteQuery ( MustParseQuery ( ` SHOW SERIES ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
2015-02-23 05:21:49 +00:00
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","columns":["host","region"],"values":[["serverA","uswest"]]}]} ` {
2015-02-22 03:21:02 +00:00
t . Fatalf ( "unexpected row(0): %s" , s )
}
// Drop series
results = s . ExecuteQuery ( MustParseQuery ( ` DROP SERIES FROM cpu ` ) , "foo" , nil )
if results . Error ( ) != nil {
t . Fatalf ( "unexpected error: %s" , results . Error ( ) )
2015-02-20 20:30:52 +00:00
}
2015-02-22 03:21:02 +00:00
results = s . ExecuteQuery ( MustParseQuery ( ` SHOW SERIES ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
2015-02-23 05:21:49 +00:00
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","columns":[]}]} ` {
2015-02-22 03:21:02 +00:00
t . Fatalf ( "unexpected row(0): %s" , s )
}
2015-02-20 20:30:52 +00:00
}
2015-02-22 03:06:58 +00:00
// Ensure Drop Series can:
// write to measurement cpu with tags region=uswest host=serverA
// write to measurement cpu with tags region=uswest host=serverB
// drop one of those series
// ensure that the dropped series is gone
// ensure that we can still query: select value from cpu where region=uswest
2015-02-22 04:20:28 +00:00
func TestServer_DropSeriesTagsPreserved ( t * testing . T ) {
2015-02-22 03:06:58 +00:00
c := NewMessagingClient ( )
s := OpenServer ( c )
defer s . Close ( )
s . CreateDatabase ( "foo" )
s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "raw" , Duration : 1 * time . Hour } )
s . SetDefaultRetentionPolicy ( "foo" , "raw" )
s . CreateUser ( "susy" , "pass" , false )
// Write series with one point to the database.
tags := map [ string ] string { "host" : "serverA" , "region" : "uswest" }
2015-02-23 22:37:10 +00:00
index , err := s . WriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : tags , Timestamp : mustParseTime ( "2000-01-01T00:00:00Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 23.2 ) } } } )
2015-02-22 03:06:58 +00:00
if err != nil {
t . Fatal ( err )
} else if err = s . Sync ( index ) ; err != nil {
t . Fatalf ( "sync error: %s" , err )
}
tags = map [ string ] string { "host" : "serverB" , "region" : "uswest" }
2015-02-23 22:37:10 +00:00
index , err = s . WriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : tags , Timestamp : mustParseTime ( "2000-01-01T00:00:01Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 33.2 ) } } } )
2015-02-22 03:06:58 +00:00
if err != nil {
t . Fatal ( err )
} else if err = s . Sync ( index ) ; err != nil {
t . Fatalf ( "sync error: %s" , err )
}
results := s . ExecuteQuery ( MustParseQuery ( ` SHOW SERIES ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
2015-02-23 05:21:49 +00:00
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","columns":["host","region"],"values":[["serverA","uswest"],["serverB","uswest"]]}]} ` {
2015-02-22 03:06:58 +00:00
t . Fatalf ( "unexpected row(0): %s" , s )
}
results = s . ExecuteQuery ( MustParseQuery ( ` DROP SERIES FROM cpu where host='serverA' ` ) , "foo" , nil )
if results . Error ( ) != nil {
t . Fatalf ( "unexpected error: %s" , results . Error ( ) )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SHOW SERIES ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
2015-02-23 05:21:49 +00:00
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","columns":["host","region"],"values":[["serverB","uswest"]]}]} ` {
2015-02-22 03:06:58 +00:00
t . Fatalf ( "unexpected row(0): %s" , s )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT * FROM cpu where host='serverA' ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
2015-02-23 05:21:49 +00:00
} else if len ( res . Series ) != 0 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
2015-02-22 03:06:58 +00:00
}
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT * FROM cpu where host='serverB' ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
2015-02-23 05:21:49 +00:00
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:01Z",33.2]]}]} ` {
2015-02-22 03:06:58 +00:00
t . Fatalf ( "unexpected row(0): %s" , s )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT * FROM cpu where region='uswest' ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error: %s" , res . Err )
2015-02-23 05:21:49 +00:00
} else if len ( res . Series ) != 1 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:01Z",33.2]]}]} ` {
2015-02-22 03:06:58 +00:00
t . Fatalf ( "unexpected row(0): %s" , s )
}
}
2015-01-14 23:44:09 +00:00
// Ensure the server can execute a query and return the data correctly.
func TestServer_ExecuteQuery ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "foo" )
s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "raw" , Duration : 1 * time . Hour } )
s . SetDefaultRetentionPolicy ( "foo" , "raw" )
s . CreateUser ( "susy" , "pass" , false )
// Write series with one point to the database.
2015-02-23 22:37:10 +00:00
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-east" } , Timestamp : mustParseTime ( "2000-01-01T00:00:00Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 20 ) } } } )
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-east" } , Timestamp : mustParseTime ( "2000-01-01T00:00:10Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 30 ) } } } )
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-west" } , Timestamp : mustParseTime ( "2000-01-01T00:00:00Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 100 ) } } } )
2015-01-14 23:44:09 +00:00
// Select data from the server.
2015-01-28 03:56:30 +00:00
results := s . ExecuteQuery ( MustParseQuery ( ` SELECT sum(value) FROM cpu GROUP BY time(10s), region ` ) , "foo" , nil )
2015-01-27 20:27:29 +00:00
if res := results . Results [ 0 ] ; res . Err != nil {
2015-01-14 23:44:09 +00:00
t . Fatalf ( "unexpected error: %s" , res . Err )
2015-02-23 05:21:49 +00:00
} else if len ( res . Series ) != 2 {
t . Fatalf ( "unexpected row count: %d" , len ( res . Series ) )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","tags": { "region":"us-east"},"columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",20],["2000-01-01T00:00:10Z",30]]}, { "name":"cpu","tags": { "region":"us-west"},"columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",100]]}]} ` {
2015-01-14 23:44:09 +00:00
t . Fatalf ( "unexpected row(0): %s" , s )
}
2015-02-04 11:51:01 +00:00
2015-02-07 05:55:28 +00:00
// Simple non-aggregation.
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT value FROM cpu WHERE time >= '2000-01-01 00:00:05' ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error during simple SELECT: %s" , res . Err )
2015-02-23 05:21:49 +00:00
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:10Z",30]]}]} ` {
2015-02-07 05:55:28 +00:00
t . Fatalf ( "unexpected row(0) during simple SELECT: %s" , s )
}
// Sum aggregation.
2015-02-07 04:36:38 +00:00
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT sum(value) FROM cpu WHERE time >= '2000-01-01 00:00:05' GROUP BY time(10s), region ` ) , "foo" , nil )
2015-02-04 11:51:01 +00:00
if res := results . Results [ 0 ] ; res . Err != nil {
2015-02-07 05:55:28 +00:00
t . Fatalf ( "unexpected error during SUM: %s" , res . Err )
2015-02-23 05:21:49 +00:00
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","tags": { "region":"us-east"},"columns":["time","sum"],"values":[["2000-01-01T00:00:10Z",30]]}]} ` {
2015-02-07 05:55:28 +00:00
t . Fatalf ( "unexpected row(0) during SUM: %s" , s )
2015-02-04 11:51:01 +00:00
}
2015-02-09 08:39:42 +00:00
// Aggregation with a null field value
2015-02-23 22:37:10 +00:00
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-east" } , Timestamp : mustParseTime ( "2000-01-01T00:00:03Z" ) , Fields : map [ string ] interface { } { "otherVal" : float64 ( 20 ) } } } )
2015-02-09 08:39:42 +00:00
// Sum aggregation.
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT sum(value) FROM cpu GROUP BY region ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error during SUM: %s" , res . Err )
2015-02-23 05:21:49 +00:00
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","tags": { "region":"us-east"},"columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",50]]}, { "name":"cpu","tags": { "region":"us-west"},"columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",100]]}]} ` {
2015-02-09 08:39:42 +00:00
t . Fatalf ( "unexpected row(0) during SUM: %s" , s )
}
2015-02-24 22:10:05 +00:00
// WHERE with AND
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "uk" , "host" : "serverZ" , "service" : "redis" } , Timestamp : mustParseTime ( "2000-01-01T00:00:03Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 20 ) } } } )
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "uk" , "host" : "serverZ" , "service" : "mysql" } , Timestamp : mustParseTime ( "2000-01-01T00:00:03Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 30 ) } } } )
// Sum aggregation.
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT sum(value) FROM cpu WHERE region='uk' AND host='serverZ' ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error during SUM: %s" , res . Err )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",50]]}]} ` {
t . Fatalf ( "unexpected row(0) during SUM AND: %s" , s )
}
2015-02-25 18:10:13 +00:00
// TODO re-enable. The following code is racy
//results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM cpu WHERE region='uk' AND host='serverZ'`), "foo", nil)
//if res := results.Results[0]; res.Err != nil {
//t.Fatalf("unexpected error during SUM: %s", res.Err)
//} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["time","value","otherVal"],"values":[["2000-01-01T00:00:03Z",30,0],["2000-01-01T00:00:03Z",20,0]]}]}` {
//t.Fatalf("unexpected row(0) during SUM AND: %s", s)
//}
2014-12-23 06:18:05 +00:00
}
2015-02-24 02:36:19 +00:00
// Ensure that when querying for raw data values that they return in time order
func TestServer_RawDataReturnsInOrder ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "foo" )
s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "raw" , Duration : 1 * time . Hour } )
s . SetDefaultRetentionPolicy ( "foo" , "raw" )
s . CreateUser ( "susy" , "pass" , false )
for i := 1 ; i < 500 ; i ++ {
host := fmt . Sprintf ( "server-%d" , i % 10 )
2015-02-24 05:17:35 +00:00
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-east" , "host" : host } , Timestamp : time . Unix ( int64 ( i ) , 0 ) , Fields : map [ string ] interface { } { "value" : float64 ( i ) } } } )
2015-02-24 02:36:19 +00:00
}
results := s . ExecuteQuery ( MustParseQuery ( ` SELECT count(value) FROM cpu ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error during COUNT: %s" , res . Err )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",499]]}]} ` {
t . Fatalf ( "unexpected row(0) during COUNT: %s" , s )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT value FROM cpu ` ) , "foo" , nil )
lastTime := int64 ( 0 )
for _ , v := range results . Results [ 0 ] . Series [ 0 ] . Values {
tt := v [ 0 ] . ( time . Time )
if lastTime > tt . UnixNano ( ) {
t . Fatal ( "values out of order" )
}
lastTime = tt . UnixNano ( )
}
2015-02-24 03:51:43 +00:00
if len ( results . Results [ 0 ] . Series [ 0 ] . Values ) != 499 {
t . Fatal ( "expected 499 values" )
}
2015-02-24 05:43:53 +00:00
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT value FROM cpu GROUP BY * ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error during GROUP BY *: %s" , res . Err )
} else if len ( res . Series ) != 10 {
t . Fatalf ( "expected 10 series back but got %d" , len ( res . Series ) )
} else if len ( res . Series [ 0 ] . Values ) != 50 {
t . Fatalf ( "expected 50 values per series but got %d" , len ( res . Series [ 0 ] . Values ) )
}
2015-02-24 02:36:19 +00:00
}
2015-02-24 06:16:06 +00:00
// Ensure that limit and offset work
func TestServer_LimitAndOffset ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "foo" )
s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "raw" , Duration : 1 * time . Hour } )
s . SetDefaultRetentionPolicy ( "foo" , "raw" )
for i := 1 ; i < 10 ; i ++ {
host := fmt . Sprintf ( "server-%d" , i )
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-east" , "host" : host } , Timestamp : time . Unix ( int64 ( i ) , 0 ) , Fields : map [ string ] interface { } { "value" : float64 ( i ) } } } )
}
results := s . ExecuteQuery ( MustParseQuery ( ` SELECT count(value) FROM cpu GROUP BY * LIMIT 20 ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error during COUNT: %s" , res . Err )
} else if len ( res . Series ) != 9 {
t . Fatalf ( "unexpected 9 series back but got %d" , len ( res . Series ) )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT count(value) FROM cpu GROUP BY * LIMIT 2 OFFSET 1 ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error during COUNT: %s" , res . Err )
2015-02-24 06:42:11 +00:00
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","tags": { "host":"server-2","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}, { "name":"cpu","tags": { "host":"server-3","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]} ` {
2015-02-24 06:16:06 +00:00
t . Fatalf ( "unexpected row(0) during COUNT: %s" , s )
}
2015-02-24 06:42:11 +00:00
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT count(value) FROM cpu GROUP BY * LIMIT 2 OFFSET 3 ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error during COUNT: %s" , res . Err )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","tags": { "host":"server-5","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}, { "name":"cpu","tags": { "host":"server-4","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]} ` {
t . Fatalf ( "unexpected row(0) during COUNT: %s" , s )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT count(value) FROM cpu GROUP BY * LIMIT 3 OFFSET 8 ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error during COUNT: %s" , res . Err )
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","tags": { "host":"server-9","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]} ` {
t . Fatalf ( "unexpected row(0) during COUNT: %s" , s )
}
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT count(value) FROM cpu GROUP BY * LIMIT 3 OFFSET 20 ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error during COUNT: %s" , res . Err )
}
2015-02-24 06:16:06 +00:00
}
2015-02-10 01:05:03 +00:00
// Ensure the server can execute a wildcard query and return the data correctly.
func TestServer_ExecuteWildcardQuery ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "foo" )
s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "raw" , Duration : 1 * time . Hour } )
s . SetDefaultRetentionPolicy ( "foo" , "raw" )
s . CreateUser ( "susy" , "pass" , false )
// Write series with one point to the database.
2015-02-10 17:00:09 +00:00
// We deliberatly write one value per insert as we need to create each field in a predicatable order for testing.
2015-02-23 22:37:10 +00:00
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-east" } , Timestamp : mustParseTime ( "2000-01-01T00:00:00Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 10 ) } } } )
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-east" } , Timestamp : mustParseTime ( "2000-01-01T00:00:10Z" ) , Fields : map [ string ] interface { } { "val-x" : 20 } } } )
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-east" } , Timestamp : mustParseTime ( "2000-01-01T00:00:20Z" ) , Fields : map [ string ] interface { } { "value" : 30 , "val-x" : 40 } } } )
2015-02-10 01:05:03 +00:00
// Select * (wildcard).
results := s . ExecuteQuery ( MustParseQuery ( ` SELECT * FROM cpu ` ) , "foo" , nil )
2015-02-09 08:39:42 +00:00
if res := results . Results [ 0 ] ; res . Err != nil {
2015-02-10 01:05:03 +00:00
t . Fatalf ( "unexpected error during SELECT *: %s" , res . Err )
2015-02-24 03:51:43 +00:00
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","columns":["time","value","val-x"],"values":[["2000-01-01T00:00:00Z",10,0],["2000-01-01T00:00:10Z",0,20],["2000-01-01T00:00:20Z",30,40]]}]} ` {
2015-02-10 01:05:03 +00:00
t . Fatalf ( "unexpected results during SELECT *: %s" , s )
2015-02-09 08:39:42 +00:00
}
2014-12-23 06:18:05 +00:00
}
2015-02-17 23:29:23 +00:00
// Ensure the server can execute a wildcard GROUP BY
func TestServer_ExecuteWildcardGroupBy ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "foo" )
s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "raw" , Duration : 1 * time . Hour } )
s . SetDefaultRetentionPolicy ( "foo" , "raw" )
s . CreateUser ( "susy" , "pass" , false )
// Write series with one point to the database.
// We deliberatly write one value per insert as we need to create each field in a predicatable order for testing.
2015-02-24 03:11:50 +00:00
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-east" } , Timestamp : mustParseTime ( "2000-01-01T00:00:00Z" ) , Fields : map [ string ] interface { } { "value" : float64 ( 10 ) } } } )
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-east" } , Timestamp : mustParseTime ( "2000-01-01T00:00:10Z" ) , Fields : map [ string ] interface { } { "value" : 20 } } } )
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-west" } , Timestamp : mustParseTime ( "2000-01-01T00:00:20Z" ) , Fields : map [ string ] interface { } { "value" : 30 } } } )
2015-02-17 23:29:23 +00:00
2015-02-18 07:04:46 +00:00
// GROUP BY * (wildcard).
results := s . ExecuteQuery ( MustParseQuery ( ` SELECT mean(value) FROM cpu GROUP BY * ` ) , "foo" , nil )
2015-02-17 23:29:23 +00:00
if res := results . Results [ 0 ] ; res . Err != nil {
2015-02-18 07:04:46 +00:00
t . Fatalf ( "unexpected error during GROUP BY *: %s" , res . Err )
2015-02-24 03:17:56 +00:00
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","tags": { "region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",15]]}, { "name":"cpu","tags": { "region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",30]]}]} ` {
2015-02-18 07:04:46 +00:00
t . Fatalf ( "unexpected results during SELECT *: %s" , s )
}
// GROUP BY * (wildcard) with time.
results = s . ExecuteQuery ( MustParseQuery ( ` SELECT mean(value) FROM cpu GROUP BY *,time(1m) ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error during GROUP BY *: %s" , res . Err )
2015-02-24 03:17:56 +00:00
} else if s := mustMarshalJSON ( res ) ; s != ` { "series":[ { "name":"cpu","tags": { "region":"us-east"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",15]]}, { "name":"cpu","tags": { "region":"us-west"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",30]]}]} ` {
2015-02-18 07:04:46 +00:00
t . Fatalf ( "unexpected results during SELECT *: %s" , s )
2015-02-17 23:29:23 +00:00
}
}
2015-01-10 15:48:50 +00:00
func TestServer_CreateShardGroupIfNotExist ( t * testing . T ) {
2014-12-23 06:18:05 +00:00
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "foo" )
2014-12-23 15:47:32 +00:00
if err := s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "bar" } ) ; err != nil {
2014-12-23 06:18:05 +00:00
t . Fatal ( err )
}
2015-01-10 15:48:50 +00:00
if err := s . CreateShardGroupIfNotExists ( "foo" , "bar" , time . Time { } ) ; err != nil {
2014-12-23 06:18:05 +00:00
t . Fatal ( err )
}
2015-02-11 04:26:33 +00:00
// Restart the server to ensure the shard group is not lost.
s . Restart ( )
2015-01-10 15:48:50 +00:00
if a , err := s . ShardGroups ( "foo" ) ; err != nil {
2014-12-23 15:47:32 +00:00
t . Fatal ( err )
2015-01-10 15:48:50 +00:00
} else if len ( a ) != 1 {
t . Fatalf ( "expected 1 shard group but found %d" , len ( a ) )
2014-12-23 06:18:05 +00:00
}
}
2015-02-11 00:50:00 +00:00
func TestServer_DeleteShardGroup ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "foo" )
if err := s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "bar" } ) ; err != nil {
t . Fatal ( err )
}
if err := s . CreateShardGroupIfNotExists ( "foo" , "bar" , time . Time { } ) ; err != nil {
t . Fatal ( err )
}
// Get the new shard's ID.
var g [ ] * influxdb . ShardGroup
g , err := s . ShardGroups ( "foo" )
if err != nil {
t . Fatal ( err )
} else if len ( g ) != 1 {
t . Fatalf ( "expected 1 shard group but found %d" , len ( g ) )
}
id := g [ 0 ] . ID
// Delete the shard group and verify it's gone.
if err := s . DeleteShardGroup ( "foo" , "bar" , id ) ; err != nil {
t . Fatal ( err )
}
g , err = s . ShardGroups ( "foo" )
if err != nil {
t . Fatal ( err )
} else if len ( g ) != 0 {
t . Fatalf ( "expected 0 shard group but found %d" , len ( g ) )
}
}
2015-01-23 09:44:56 +00:00
/ * TODO ( benbjohnson ) : Change test to not expose underlying series ids directly .
2014-12-23 06:18:05 +00:00
func TestServer_Measurements ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "foo" )
2015-01-12 20:10:26 +00:00
s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "mypolicy" , Duration : 1 * time . Hour } )
2014-12-23 15:47:32 +00:00
s . CreateUser ( "susy" , "pass" , false )
2014-12-23 06:18:05 +00:00
// Write series with one point to the database.
timestamp := mustParseTime ( "2000-01-01T00:00:00Z" )
tags := map [ string ] string { "host" : "servera.influx.com" , "region" : "uswest" }
values := map [ string ] interface { } { "value" : 23.2 }
2015-02-23 22:37:10 +00:00
index , err := s . WriteSeries ( "foo" , "mypolicy" , [ ] influxdb . Point { influxdb . Point { Name : "cpu_load" , Tags : tags , Timestamp : timestamp , Fields : values } } )
2015-01-13 17:16:43 +00:00
if err != nil {
2014-12-23 06:18:05 +00:00
t . Fatal ( err )
2015-01-13 17:16:43 +00:00
} else if err = s . Sync ( index ) ; err != nil {
2015-01-13 15:00:30 +00:00
t . Fatalf ( "sync error: %s" , err )
2014-12-23 06:18:05 +00:00
}
2014-12-30 22:50:55 +00:00
expectedMeasurementNames := [ ] string { "cpu_load" }
2014-12-30 21:45:58 +00:00
expectedSeriesIDs := influxdb . SeriesIDs ( [ ] uint32 { uint32 ( 1 ) } )
names := s . MeasurementNames ( "foo" )
if ! reflect . DeepEqual ( names , expectedMeasurementNames ) {
t . Fatalf ( "Mesurements not the same:\n exp: %s\n got: %s" , expectedMeasurementNames , names )
}
ids := s . MeasurementSeriesIDs ( "foo" , "foo" )
2015-01-23 09:44:56 +00:00
if ! reflect . DeepEqual ( ids , expectedSeriesIDs ) {
2015-01-11 23:14:22 +00:00
t . Fatalf ( "Series IDs not the same:\n exp: %v\n got: %v" , expectedSeriesIDs , ids )
2014-12-23 06:18:05 +00:00
}
2014-12-24 04:46:54 +00:00
s . Restart ( )
2014-12-30 21:45:58 +00:00
names = s . MeasurementNames ( "foo" )
if ! reflect . DeepEqual ( names , expectedMeasurementNames ) {
t . Fatalf ( "Mesurements not the same:\n exp: %s\n got: %s" , expectedMeasurementNames , names )
}
ids = s . MeasurementSeriesIDs ( "foo" , "foo" )
2015-01-23 09:44:56 +00:00
if ! reflect . DeepEqual ( ids , expectedSeriesIDs ) {
2015-01-11 23:14:22 +00:00
t . Fatalf ( "Series IDs not the same:\n exp: %v\n got: %v" , expectedSeriesIDs , ids )
2014-12-23 06:18:05 +00:00
}
}
2015-01-23 09:44:56 +00:00
* /
2014-12-23 06:18:05 +00:00
2015-01-18 21:45:22 +00:00
// Ensure the server can convert a measurement into its normalized form.
func TestServer_NormalizeMeasurement ( t * testing . T ) {
var tests = [ ] struct {
in string // input string
db string // current database
out string // normalized string
err string // error, if any
} {
{ in : ` cpu ` , db : ` db0 ` , out : ` "db0"."rp0"."cpu" ` } ,
{ in : ` "".cpu ` , db : ` db0 ` , out : ` "db0"."rp0"."cpu" ` } ,
{ in : ` "rp0".cpu ` , db : ` db0 ` , out : ` "db0"."rp0"."cpu" ` } ,
{ in : ` ""."".cpu ` , db : ` db0 ` , out : ` "db0"."rp0"."cpu" ` } ,
{ in : ` ""..cpu ` , db : ` db0 ` , out : ` "db0"."rp0"."cpu" ` } ,
{ in : ` "db1"..cpu ` , db : ` db0 ` , out : ` "db1"."rp1"."cpu" ` } ,
{ in : ` "db1"."rp1".cpu ` , db : ` db0 ` , out : ` "db1"."rp1"."cpu" ` } ,
{ in : ` "db1"."rp2".cpu ` , db : ` db0 ` , out : ` "db1"."rp2"."cpu" ` } ,
{ in : ` ` , err : ` invalid measurement: ` } ,
{ in : ` "foo"."bar"."baz"."bat" ` , err : ` invalid measurement: "foo"."bar"."baz"."bat" ` } ,
{ in : ` "no_db"..cpu ` , db : ` ` , err : ` database not found: no_db ` } ,
{ in : ` "db2"..cpu ` , db : ` ` , err : ` default retention policy not set for: db2 ` } ,
{ in : ` "db2"."no_policy".cpu ` , db : ` ` , err : ` retention policy does not exist: db2.no_policy ` } ,
}
// Create server with a variety of databases, retention policies, and measurements
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Default database with one policy.
s . CreateDatabase ( "db0" )
s . CreateRetentionPolicy ( "db0" , & influxdb . RetentionPolicy { Name : "rp0" } )
s . SetDefaultRetentionPolicy ( "db0" , "rp0" )
// Another database with two policies.
s . CreateDatabase ( "db1" )
s . CreateRetentionPolicy ( "db1" , & influxdb . RetentionPolicy { Name : "rp1" } )
s . CreateRetentionPolicy ( "db1" , & influxdb . RetentionPolicy { Name : "rp2" } )
s . SetDefaultRetentionPolicy ( "db1" , "rp1" )
// Another database with no policies.
s . CreateDatabase ( "db2" )
// Execute the tests
for i , tt := range tests {
out , err := s . NormalizeMeasurement ( tt . in , tt . db )
if tt . err != errstr ( err ) {
t . Errorf ( "%d. %s/%s: error: exp: %s, got: %s" , i , tt . db , tt . in , tt . err , errstr ( err ) )
} else if tt . out != out {
t . Errorf ( "%d. %s/%s: out: exp: %s, got: %s" , i , tt . db , tt . in , tt . out , out )
}
}
}
// Ensure the server can normalize all statements in query.
func TestServer_NormalizeQuery ( t * testing . T ) {
var tests = [ ] struct {
in string // input query
db string // default database
out string // output query
err string // error, if any
} {
{
in : ` SELECT cpu.value FROM cpu ` , db : ` db0 ` ,
out : ` SELECT "db0"."rp0"."cpu"."value" FROM "db0"."rp0"."cpu" ` ,
} ,
{
in : ` SELECT value FROM cpu ` , db : ` no_db ` , err : ` database not found: no_db ` ,
} ,
}
// Start server with database & retention policy.
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
s . CreateDatabase ( "db0" )
s . CreateRetentionPolicy ( "db0" , & influxdb . RetentionPolicy { Name : "rp0" } )
s . SetDefaultRetentionPolicy ( "db0" , "rp0" )
// Execute the tests
for i , tt := range tests {
out := MustParseQuery ( tt . in )
2015-01-28 03:56:30 +00:00
err := s . NormalizeStatement ( out . Statements [ 0 ] , tt . db )
2015-01-18 21:45:22 +00:00
if tt . err != errstr ( err ) {
t . Errorf ( "%d. %s/%s: error: exp: %s, got: %s" , i , tt . db , tt . in , tt . err , errstr ( err ) )
} else if err == nil && tt . out != out . String ( ) {
t . Errorf ( "%d. %s/%s:\n\nexp: %s\n\ngot: %s\n\n" , i , tt . db , tt . in , tt . out , out . String ( ) )
}
}
}
2015-01-20 02:44:47 +00:00
// Ensure the server can create a continuous query
func TestServer_CreateContinuousQuery ( t * testing . T ) {
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Create the "foo" database.
if err := s . CreateDatabase ( "foo" ) ; err != nil {
t . Fatal ( err )
}
if err := s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "bar" } ) ; err != nil {
t . Fatal ( err )
}
2015-02-08 11:06:30 +00:00
s . SetDefaultRetentionPolicy ( "foo" , "bar" )
2015-01-20 02:44:47 +00:00
// create and check
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(10m) END"
stmt , err := influxql . NewParser ( strings . NewReader ( q ) ) . ParseStatement ( )
if err != nil {
t . Fatalf ( "error parsing query %s" , err . Error ( ) )
}
cq := stmt . ( * influxql . CreateContinuousQueryStatement )
if err := s . CreateContinuousQuery ( cq ) ; err != nil {
t . Fatalf ( "error creating continuous query %s" , err . Error ( ) )
}
queries := s . ContinuousQueries ( "foo" )
cqObj , _ := influxdb . NewContinuousQuery ( q )
expected := [ ] * influxdb . ContinuousQuery { cqObj }
2015-02-08 11:06:30 +00:00
if mustMarshalJSON ( expected ) != mustMarshalJSON ( queries ) {
t . Fatalf ( "query not saved:\n\texp: %s\n\tgot: %s" , mustMarshalJSON ( expected ) , mustMarshalJSON ( queries ) )
2015-01-20 02:44:47 +00:00
}
s . Restart ( )
// check again
queries = s . ContinuousQueries ( "foo" )
if ! reflect . DeepEqual ( queries , expected ) {
t . Fatalf ( "query not saved:\n\texp: %s\ngot: %s" , mustMarshalJSON ( expected ) , mustMarshalJSON ( queries ) )
}
}
// Ensure the server prevents a duplicate named continuous query from being created
func TestServer_CreateContinuousQuery_ErrContinuousQueryExists ( t * testing . T ) {
2015-01-25 21:41:39 +00:00
t . Skip ( "pending" )
2015-01-20 02:44:47 +00:00
}
// Ensure the server returns an error when creating a continuous query on a database that doesn't exist
2015-02-09 12:13:25 +00:00
func TestServer_CreateContinuousQuery_ErrDatabaseNotFound ( t * testing . T ) {
2015-01-25 21:41:39 +00:00
t . Skip ( "pending" )
2015-01-20 02:44:47 +00:00
}
// Ensure the server returns an error when creating a continuous query on a retention policy that doesn't exist
2015-02-09 12:13:25 +00:00
func TestServer_CreateContinuousQuery_ErrRetentionPolicyNotFound ( t * testing . T ) {
t . Skip ( "pending" )
}
2015-01-20 02:44:47 +00:00
2015-02-09 12:13:25 +00:00
func TestServer_CreateContinuousQuery_ErrInfinteLoop ( t * testing . T ) {
2015-01-25 21:41:39 +00:00
t . Skip ( "pending" )
}
// Ensure
func TestServer_RunContinuousQueries ( t * testing . T ) {
2015-02-21 04:45:48 +00:00
t . Skip ( )
2015-01-25 21:41:39 +00:00
s := OpenServer ( NewMessagingClient ( ) )
defer s . Close ( )
// Create the "foo" database.
if err := s . CreateDatabase ( "foo" ) ; err != nil {
t . Fatal ( err )
}
2015-01-28 05:11:48 +00:00
if err := s . CreateRetentionPolicy ( "foo" , & influxdb . RetentionPolicy { Name : "raw" } ) ; err != nil {
2015-01-25 21:41:39 +00:00
t . Fatal ( err )
}
2015-01-28 05:11:48 +00:00
s . SetDefaultRetentionPolicy ( "foo" , "raw" )
2015-01-25 21:41:39 +00:00
2015-02-09 07:20:47 +00:00
s . RecomputePreviousN = 50
s . RecomputeNoOlderThan = time . Second
2015-01-25 21:41:39 +00:00
s . ComputeRunsPerInterval = 5
2015-02-09 07:20:47 +00:00
s . ComputeNoMoreThan = 2 * time . Millisecond
2015-01-25 21:41:39 +00:00
2015-02-08 11:06:30 +00:00
// create cq and check
2015-02-09 07:20:47 +00:00
q := ` CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT mean(value) INTO cpu_region FROM cpu GROUP BY time(5ms), region END `
2015-01-25 21:41:39 +00:00
stmt , err := influxql . NewParser ( strings . NewReader ( q ) ) . ParseStatement ( )
if err != nil {
t . Fatalf ( "error parsing query %s" , err . Error ( ) )
}
cq := stmt . ( * influxql . CreateContinuousQueryStatement )
if err := s . CreateContinuousQuery ( cq ) ; err != nil {
t . Fatalf ( "error creating continuous query %s" , err . Error ( ) )
}
2015-02-08 11:06:30 +00:00
if err := s . RunContinuousQueries ( ) ; err != nil {
t . Fatalf ( "error running cqs when no data exists: %s" , err . Error ( ) )
}
2015-01-25 21:41:39 +00:00
2015-02-09 07:20:47 +00:00
// set a test time in the middle of a 5 second interval that we can work with
testTime := time . Now ( ) . UTC ( ) . Round ( 5 * time . Millisecond )
if testTime . UnixNano ( ) > time . Now ( ) . UnixNano ( ) {
testTime = testTime . Add ( - 5 * time . Millisecond )
2015-01-28 05:11:48 +00:00
}
2015-02-09 07:20:47 +00:00
testTime . Add ( time . Millisecond * 2 )
2015-01-28 05:11:48 +00:00
2015-02-23 22:37:10 +00:00
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-east" } , Timestamp : testTime , Fields : map [ string ] interface { } { "value" : float64 ( 30 ) } } } )
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-east" } , Timestamp : testTime . Add ( - time . Millisecond * 5 ) , Fields : map [ string ] interface { } { "value" : float64 ( 20 ) } } } )
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-west" } , Timestamp : testTime , Fields : map [ string ] interface { } { "value" : float64 ( 100 ) } } } )
2015-02-09 07:20:47 +00:00
// Run CQs after a period of time
time . Sleep ( time . Millisecond * 50 )
2015-01-28 05:11:48 +00:00
s . RunContinuousQueries ( )
2015-02-09 07:20:47 +00:00
// give the CQs time to run
time . Sleep ( time . Millisecond * 100 )
verify := func ( num int , exp string ) {
results := s . ExecuteQuery ( MustParseQuery ( ` SELECT mean(mean) FROM cpu_region GROUP BY region ` ) , "foo" , nil )
if res := results . Results [ 0 ] ; res . Err != nil {
t . Fatalf ( "unexpected error verify %d: %s" , num , res . Err )
2015-02-23 05:21:49 +00:00
} else if len ( res . Series ) != 2 {
t . Fatalf ( "unexpected row count on verify %d: %d" , num , len ( res . Series ) )
2015-02-09 07:20:47 +00:00
} else if s := mustMarshalJSON ( res ) ; s != exp {
t . Fatalf ( "unexpected row(0) on verify %d: %s" , num , s )
}
}
// ensure CQ results were saved
2015-02-23 05:21:49 +00:00
verify ( 1 , ` { "series":[ { "name":"cpu_region","tags": { "region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",25]]}, { "name":"cpu_region","tags": { "region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",100]]}]} ` )
2015-02-09 07:20:47 +00:00
// ensure that repeated runs don't cause duplicate data
2015-01-28 05:11:48 +00:00
s . RunContinuousQueries ( )
2015-02-23 05:21:49 +00:00
verify ( 2 , ` { "series":[ { "name":"cpu_region","tags": { "region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",25]]}, { "name":"cpu_region","tags": { "region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",100]]}]} ` )
2015-02-09 07:20:47 +00:00
// ensure that data written into a previous window is picked up and the result recomputed.
time . Sleep ( time . Millisecond * 2 )
2015-02-23 22:37:10 +00:00
s . MustWriteSeries ( "foo" , "raw" , [ ] influxdb . Point { { Name : "cpu" , Tags : map [ string ] string { "region" : "us-west" } , Timestamp : testTime . Add ( - time . Millisecond ) , Fields : map [ string ] interface { } { "value" : float64 ( 50 ) } } } )
2015-01-28 05:11:48 +00:00
s . RunContinuousQueries ( )
2015-02-09 07:20:47 +00:00
// give CQs time to run
time . Sleep ( time . Millisecond * 50 )
2015-02-23 05:21:49 +00:00
verify ( 3 , ` { "series":[ { "name":"cpu_region","tags": { "region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",25]]}, { "name":"cpu_region","tags": { "region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",75]]}]} ` )
2015-01-20 02:44:47 +00:00
}
2014-12-24 04:46:54 +00:00
func mustMarshalJSON ( v interface { } ) string {
b , err := json . Marshal ( v )
if err != nil {
panic ( "marshal: " + err . Error ( ) )
2014-12-23 06:18:05 +00:00
}
2014-12-24 04:46:54 +00:00
return string ( b )
}
func measurementsEqual ( l influxdb . Measurements , r influxdb . Measurements ) bool {
if mustMarshalJSON ( l ) == mustMarshalJSON ( r ) {
return true
2014-12-23 06:18:05 +00:00
}
2014-12-24 04:46:54 +00:00
return false
2014-12-23 06:18:05 +00:00
}
func TestServer_SeriesByTagNames ( t * testing . T ) { t . Skip ( "pending" ) }
func TestServer_SeriesByTagValues ( t * testing . T ) { t . Skip ( "pending" ) }
func TestDatabase_TagNames ( t * testing . T ) { t . Skip ( "pending" ) }
func TestServer_TagNamesBySeries ( t * testing . T ) { t . Skip ( "pending" ) }
func TestServer_TagValues ( t * testing . T ) { t . Skip ( "pending" ) }
func TestServer_TagValuesBySeries ( t * testing . T ) { t . Skip ( "pending" ) }
2015-01-29 21:07:43 +00:00
// Point JSON Unmarshal tests
2015-01-29 02:30:00 +00:00
2015-01-29 21:07:43 +00:00
func TestbatchWrite_UnmarshalEpoch ( t * testing . T ) {
2015-01-29 02:30:00 +00:00
var (
now = time . Now ( )
nanos = now . UnixNano ( )
micros = nanos / int64 ( time . Microsecond )
millis = nanos / int64 ( time . Millisecond )
seconds = nanos / int64 ( time . Second )
minutes = nanos / int64 ( time . Minute )
hours = nanos / int64 ( time . Hour )
)
tests := [ ] struct {
name string
epoch int64
} {
{ name : "nanos" , epoch : nanos } ,
{ name : "micros" , epoch : micros } ,
{ name : "millis" , epoch : millis } ,
{ name : "seconds" , epoch : seconds } ,
{ name : "minutes" , epoch : minutes } ,
{ name : "hours" , epoch : hours } ,
}
for _ , test := range tests {
2015-01-29 21:07:43 +00:00
json := fmt . Sprintf ( ` "points": [ { timestamp: "%d"} ` , test . epoch )
log . Println ( json )
t . Fatal ( "foo" )
2015-01-29 02:30:00 +00:00
}
}
2015-02-21 20:46:16 +00:00
// Ensure the server will skip over replayed log entries and not blow up.
func TestServer_Replay ( t * testing . T ) {
c := NewMessagingClient ( )
s := OpenServer ( c )
defer s . Close ( )
// Record all messages through the client.
var messages [ ] * messaging . Message
c . PublishFunc = func ( m * messaging . Message ) ( uint64 , error ) {
messages = append ( messages , m )
c . c <- m
return m . Index , nil
}
// Create a new node.
u , _ := url . Parse ( "http://localhost:80000" )
if err := s . CreateDataNode ( u ) ; err != nil {
t . Fatal ( err )
}
s . Restart ( )
// Replay messages through client.
for _ , m := range messages {
c . c <- m
}
// Sleep so it has a moment to process & ignore.
time . Sleep ( 100 * time . Millisecond )
// NOTE: There is no way to introspect into the server to see that
// messages are being dropped. This test exists to make sure the server
// doesn't crash on retry.
}
2014-10-22 05:32:19 +00:00
// Server is a wrapping test struct for influxdb.Server.
type Server struct {
* influxdb . Server
}
// NewServer returns a new test server instance.
2014-12-30 22:46:50 +00:00
func NewServer ( ) * Server {
2015-02-05 22:54:32 +00:00
s := influxdb . NewServer ( )
s . SetAuthenticationEnabled ( false )
return & Server { s }
2014-10-22 05:32:19 +00:00
}
// OpenServer returns a new, open test server instance.
func OpenServer ( client influxdb . MessagingClient ) * Server {
2015-01-10 15:48:50 +00:00
s := OpenUninitializedServer ( client )
if err := s . Initialize ( & url . URL { Host : "127.0.0.1:8080" } ) ; err != nil {
panic ( err . Error ( ) )
}
return s
}
// OpenUninitializedServer returns a new, uninitialized, open test server instance.
func OpenUninitializedServer ( client influxdb . MessagingClient ) * Server {
2014-12-30 22:46:50 +00:00
s := NewServer ( )
2014-10-22 05:32:19 +00:00
if err := s . Open ( tempfile ( ) ) ; err != nil {
panic ( err . Error ( ) )
}
2014-12-30 22:46:50 +00:00
if err := s . SetClient ( client ) ; err != nil {
panic ( err . Error ( ) )
}
2014-10-22 05:32:19 +00:00
return s
}
2015-01-23 09:44:56 +00:00
// OpenDefaultServer opens a server and creates a default db & retention policy.
func OpenDefaultServer ( client influxdb . MessagingClient ) * Server {
s := OpenServer ( client )
if err := s . CreateDatabase ( "db" ) ; err != nil {
panic ( err . Error ( ) )
2015-01-27 02:14:07 +00:00
} else if err = s . CreateRetentionPolicy ( "db" , & influxdb . RetentionPolicy { Name : "raw" , Duration : 1 * time . Hour } ) ; err != nil {
2015-01-23 09:44:56 +00:00
panic ( err . Error ( ) )
} else if err = s . SetDefaultRetentionPolicy ( "db" , "raw" ) ; err != nil {
panic ( err . Error ( ) )
}
return s
}
2014-11-05 05:32:17 +00:00
// Restart stops and restarts the server.
func ( s * Server ) Restart ( ) {
2014-12-30 22:46:50 +00:00
path , client := s . Path ( ) , s . Client ( )
// Stop the server.
2014-11-05 05:32:17 +00:00
if err := s . Server . Close ( ) ; err != nil {
panic ( "close: " + err . Error ( ) )
}
2014-12-30 22:46:50 +00:00
// Open and reset the client.
2014-11-05 05:32:17 +00:00
if err := s . Server . Open ( path ) ; err != nil {
panic ( "open: " + err . Error ( ) )
}
2014-12-30 22:46:50 +00:00
if err := s . Server . SetClient ( client ) ; err != nil {
panic ( "client: " + err . Error ( ) )
}
2014-11-05 05:32:17 +00:00
}
2014-10-22 05:32:19 +00:00
// Close shuts down the server and removes all temporary files.
func ( s * Server ) Close ( ) {
defer os . RemoveAll ( s . Path ( ) )
s . Server . Close ( )
}
2015-01-14 23:44:09 +00:00
// MustWriteSeries writes series data and waits for the data to be applied.
// Returns the messaging index for the write.
2015-01-15 05:21:55 +00:00
func ( s * Server ) MustWriteSeries ( database , retentionPolicy string , points [ ] influxdb . Point ) uint64 {
index , err := s . WriteSeries ( database , retentionPolicy , points )
2015-01-14 23:44:09 +00:00
if err != nil {
panic ( err . Error ( ) )
} else if err = s . Sync ( index ) ; err != nil {
panic ( "sync error: " + err . Error ( ) )
}
return index
}
2014-10-22 05:32:19 +00:00
// MessagingClient represents a test client for the messaging broker.
type MessagingClient struct {
2014-10-24 05:38:03 +00:00
index uint64
c chan * messaging . Message
2015-01-10 15:48:50 +00:00
PublishFunc func ( * messaging . Message ) ( uint64 , error )
2015-02-12 21:38:33 +00:00
CreateReplicaFunc func ( replicaID uint64 , connectURL * url . URL ) error
2015-01-10 15:48:50 +00:00
DeleteReplicaFunc func ( replicaID uint64 ) error
SubscribeFunc func ( replicaID , topicID uint64 ) error
UnsubscribeFunc func ( replicaID , topicID uint64 ) error
2014-10-22 05:32:19 +00:00
}
// NewMessagingClient returns a new instance of MessagingClient.
func NewMessagingClient ( ) * MessagingClient {
2014-10-24 05:38:03 +00:00
c := & MessagingClient { c : make ( chan * messaging . Message , 1 ) }
c . PublishFunc = c . send
2015-02-12 21:38:33 +00:00
c . CreateReplicaFunc = func ( replicaID uint64 , connectURL * url . URL ) error { return nil }
2015-01-10 15:48:50 +00:00
c . DeleteReplicaFunc = func ( replicaID uint64 ) error { return nil }
c . SubscribeFunc = func ( replicaID , topicID uint64 ) error { return nil }
c . UnsubscribeFunc = func ( replicaID , topicID uint64 ) error { return nil }
2014-10-24 05:38:03 +00:00
return c
}
// Publish attaches an autoincrementing index to the message.
// This function also execute's the client's PublishFunc mock function.
func ( c * MessagingClient ) Publish ( m * messaging . Message ) ( uint64 , error ) {
c . index ++
m . Index = c . index
return c . PublishFunc ( m )
}
// send sends the message through to the channel.
// This is the default value of PublishFunc.
func ( c * MessagingClient ) send ( m * messaging . Message ) ( uint64 , error ) {
c . c <- m
return m . Index , nil
2014-10-22 05:32:19 +00:00
}
2015-01-07 00:21:32 +00:00
// Creates a new replica with a given ID on the broker.
2015-02-12 21:38:33 +00:00
func ( c * MessagingClient ) CreateReplica ( replicaID uint64 , connectURL * url . URL ) error {
return c . CreateReplicaFunc ( replicaID , connectURL )
2015-01-10 15:48:50 +00:00
}
2015-01-07 00:21:32 +00:00
// Deletes an existing replica with a given ID from the broker.
2015-01-10 15:48:50 +00:00
func ( c * MessagingClient ) DeleteReplica ( replicaID uint64 ) error {
return c . DeleteReplicaFunc ( replicaID )
}
// Subscribe adds a subscription to a replica for a topic on the broker.
func ( c * MessagingClient ) Subscribe ( replicaID , topicID uint64 ) error {
return c . SubscribeFunc ( replicaID , topicID )
}
// Unsubscribe removes a subscrition from a replica for a topic on the broker.
func ( c * MessagingClient ) Unsubscribe ( replicaID , topicID uint64 ) error {
return c . UnsubscribeFunc ( replicaID , topicID )
}
2015-01-07 00:21:32 +00:00
2014-10-24 05:38:03 +00:00
// C returns a channel for streaming message.
2014-10-22 05:32:19 +00:00
func ( c * MessagingClient ) C ( ) <- chan * messaging . Message { return c . c }
// tempfile returns a temporary path.
func tempfile ( ) string {
f , _ := ioutil . TempFile ( "" , "influxdb-" )
path := f . Name ( )
f . Close ( )
os . Remove ( path )
return path
}
2014-11-10 02:55:53 +00:00
// mustParseTime parses an IS0-8601 string. Panic on error.
func mustParseTime ( s string ) time . Time {
t , err := time . Parse ( time . RFC3339 , s )
if err != nil {
panic ( err . Error ( ) )
}
return t
}
2015-01-14 23:44:09 +00:00
// MustParseQuery parses an InfluxQL query. Panic on error.
func MustParseQuery ( s string ) * influxql . Query {
q , err := influxql . NewParser ( strings . NewReader ( s ) ) . ParseQuery ( )
if err != nil {
panic ( err . Error ( ) )
}
return q
}
2015-01-23 09:44:56 +00:00
// MustParseSelectStatement parses an InfluxQL select statement. Panic on error.
func MustParseSelectStatement ( s string ) * influxql . SelectStatement {
stmt , err := influxql . NewParser ( strings . NewReader ( s ) ) . ParseStatement ( )
if err != nil {
panic ( err . Error ( ) )
}
return stmt . ( * influxql . SelectStatement )
}
2014-10-30 00:21:17 +00:00
// errstr is an ease-of-use function to convert an error to a string.
func errstr ( err error ) string {
if err != nil {
return err . Error ( )
}
return ""
}
2014-10-22 05:32:19 +00:00
func warn ( v ... interface { } ) { fmt . Fprintln ( os . Stderr , v ... ) }
func warnf ( msg string , v ... interface { } ) { fmt . Fprintf ( os . Stderr , msg + "\n" , v ... ) }