Add broker client publishing.

Ben Johnson 2014-10-23 18:54:12 -06:00
parent dc8a73b857
commit 9c1e9d93bc
13 changed files with 273 additions and 929 deletions

View File

@ -1,13 +1,13 @@
package datastore
package influxdb
import (
type Datastore interface {
ExecuteQuery(user common.User, database string,
ExecuteQuery(user User, database string,
query *parser.SelectQuery, yield func(*protocol.Series) error,
ringFilter func(database, series *string, time *int64) bool) error
@ -28,3 +28,5 @@ type Datastore interface {
DropSeries(database, series string) error

View File

@ -474,3 +474,15 @@ func yieldToProcessor(s *protocol.Series, p engine.Processor, aliases []string)
return true, nil
type FieldLookupError struct {
message string
func NewFieldLookupError(message string) *FieldLookupError {
return &FieldLookupError{message}
func (self FieldLookupError) Error() string {
return self.message

View File

@ -34,37 +34,10 @@ type ShardDatastore struct {
metaStore *metastore.Store
const (
ONE_MEGABYTE = 1024 * 1024
SHARD_DATABASE_DIR = "shard_db_v2"
var (
// This datastore implements the PersistentAtomicInteger interface. All of the persistent
// integers start with this prefix, followed by their name
ATOMIC_INCREMENT_PREFIX = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFD}
// NEXT_ID_KEY holds the next id. ids are used to "intern" timeseries and column names
NEXT_ID_KEY = []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
// SERIES_COLUMN_INDEX_PREFIX is the prefix of the series to column names index
SERIES_COLUMN_INDEX_PREFIX = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE}
// DATABASE_SERIES_INDEX_PREFIX is the prefix of the database to series names index
DATABASE_SERIES_INDEX_PREFIX = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
MAX_SEQUENCE = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
// replicateWrite = protocol.Request_REPLICATION_WRITE
TRUE = true
const shardDatabaseDir = "shard_db_v2"
func NewShardDatastore(config *configuration.Configuration, metaStore *metastore.Store) (*ShardDatastore, error) {
baseDbDir := filepath.Join(config.DataDir, SHARD_DATABASE_DIR)
baseDbDir := filepath.Join(config.DataDir, shardDatabaseDir)
err := os.MkdirAll(baseDbDir, 0744)
if err != nil {
return nil, err
@ -292,15 +265,3 @@ func isPointInRange(fieldId, startTime, endTime, point []byte) bool {
time := point[8:16]
return bytes.Equal(id, fieldId) && bytes.Compare(time, startTime) > -1 && bytes.Compare(time, endTime) < 1
type FieldLookupError struct {
message string
func NewFieldLookupError(message string) *FieldLookupError {
return &FieldLookupError{message}
func (self FieldLookupError) Error() string {
return self.message

View File

@ -9,6 +9,9 @@ var (
// ErrServerOpen is returned when opening an already open server.
ErrServerOpen = errors.New("server already open")
// ErrServerClosed is returned when closing an already closed server.
ErrServerClosed = errors.New("server already closed")
// ErrPathRequired is returned when opening a server without a path.
ErrPathRequired = errors.New("path required")

View File

@ -1,679 +0,0 @@
package integration
// import (
// "cluster"
// . "common"
// "configuration"
// "encoding/json"
// "fmt"
// "io/ioutil"
// "net"
// "net/http"
// "os"
// "protocol"
// "runtime"
// "sort"
// "strconv"
// "strings"
// "testing"
// "time"
// "wal"
// . ""
// )
// // Hook up gocheck into the gotest runner.
// func Test(t *testing.T) {
// TestingT(t)
// }
// type CoordinatorSuite struct{}
// var _ = Suite(&CoordinatorSuite{})
// var DEFAULT_CONFIGURATION = &configuration.Configuration{
// ConcurrentShardQueryLimit: 1,
// }
// func init() {
// runtime.GOMAXPROCS(runtime.NumCPU() * 2)
// }
// const (
// SERVER_STARTUP_TIME = time.Second * 2
// REPLICATION_LAG = time.Millisecond * 500
// )
// type WALMock struct {
// cluster.WAL
// }
// func (self *WALMock) AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard) (uint32, error) {
// return uint32(1), nil
// }
// func stringToSeries(seriesString string, c *C) *protocol.Series {
// series := &protocol.Series{}
// err := json.Unmarshal([]byte(seriesString), &series)
// c.Assert(err, IsNil)
// return series
// }
// func startAndVerifyCluster(count int, c *C) []*RaftServer {
// firstPort := 0
// servers := make([]*RaftServer, count, count)
// for i := 0; i < count; i++ {
// l, err := net.Listen("tcp4", ":0")
// c.Assert(err, IsNil)
// servers[i] = newConfigAndServer(c)
// if firstPort == 0 {
// firstPort = l.Addr().(*net.TCPAddr).Port
// } else {
// servers[i].config.SeedServers = []string{fmt.Sprintf("http://localhost:%d", firstPort)}
// }
// servers[i].Serve(l)
// // verify that the server is up
// getConfig(servers[i].port, c)
// }
// return servers
// }
// func cleanWithoutDeleting(servers ...*RaftServer) {
// for _, server := range servers {
// server.Close()
// }
// http.DefaultTransport.(*http.Transport).CloseIdleConnections()
// }
// func clean(servers ...*RaftServer) {
// cleanWithoutDeleting(servers...)
// for _, server := range servers {
// os.RemoveAll(server.path)
// }
// }
// func newProtobufClient(connectString string) cluster.ServerConnection {
// return NewProtobufClient(connectString, 0)
// }
// func newConfigAndServer(c *C) *RaftServer {
// path, err := ioutil.TempDir(os.TempDir(), "influxdb")
// c.Assert(err, IsNil)
// setupConfig := &configuration.Configuration{Hostname: "localhost", RaftDir: path, RaftServerPort: 0}
// config := cluster.NewClusterConfiguration(setupConfig, &WALMock{}, nil, newProtobufClient)
// server := NewRaftServer(setupConfig, config)
// return server
// }
// func getConfig(port int, c *C) string {
// host := fmt.Sprintf("localhost:%d", port)
// resp, err := http.Get("http://" + host + "/cluster_config")
// c.Assert(err, Equals, nil)
// defer resp.Body.Close()
// body, err := ioutil.ReadAll(resp.Body)
// c.Assert(err, Equals, nil)
// return string(body)
// }
// func assertConfigContains(port int, contains string, isContained bool, c *C) {
// body := getConfig(port, c)
// c.Assert(strings.Contains(string(body), contains), Equals, isContained)
// }
// func (self *CoordinatorSuite) TestCanCreateCoordinatorWithNoSeed(c *C) {
// server := startAndVerifyCluster(1, c)[0]
// defer clean(server)
// }
// func (self *CoordinatorSuite) TestCanRecover(c *C) {
// server := startAndVerifyCluster(1, c)[0]
// defer clean(server)
// path, port := server.path, server.port
// server.CreateDatabase("db1", uint8(1))
// assertConfigContains(server.port, "db1", true, c)
// cleanWithoutDeleting(server)
// server = newConfigAndServer(c)
// // reset the path and port to the previous server and remove the
// // path that was created by newConfigAndServer
// os.RemoveAll(server.path)
// server.path = path
// server.port = port
// defer clean(server)
// server.ListenAndServe()
// time.Sleep(time.Second)
// assertConfigContains(server.port, "db1", true, c)
// }
// func (self *CoordinatorSuite) TestCanSnapshot(c *C) {
// server := startAndVerifyCluster(1, c)[0]
// // defer clean(server)
// path, port, name := server.path, server.port,
// for i := 0; i < 1000; i++ {
// dbname := fmt.Sprintf("db%d", i)
// server.CreateDatabase(dbname, uint8(1))
// assertConfigContains(server.port, dbname, true, c)
// }
// size, err := GetFileSize(server.raftServer.LogPath())
// c.Assert(err, IsNil)
// server.ForceLogCompaction()
// newSize, err := GetFileSize(server.raftServer.LogPath())
// c.Assert(err, IsNil)
// c.Assert(newSize < size, Equals, true)
// fmt.Printf("size of %s shrinked from %d to %d\n", server.raftServer.LogPath(), size, newSize)
// cleanWithoutDeleting(server)
// server = newConfigAndServer(c)
// // reset the path and port to the previous server and remove the
// // path that was created by newConfigAndServer
// os.RemoveAll(server.path)
// server.path = path
// server.port = port
// = name
// // defer clean(server)
// err = server.ListenAndServe()
// c.Assert(err, IsNil)
// for i := 0; i < 1000; i++ {
// dbname := fmt.Sprintf("db%d", i)
// assertConfigContains(server.port, dbname, true, c)
// }
// // make another server join the cluster
// server2 := newConfigAndServer(c)
// defer clean(server2)
// l, err := net.Listen("tcp4", ":0")
// c.Assert(err, IsNil)
// server2.config.SeedServers = []string{fmt.Sprintf("http://localhost:%d", server.port)}
// server2.Serve(l)
// for i := 0; i < 1000; i++ {
// dbname := fmt.Sprintf("db%d", i)
// assertConfigContains(server2.port, dbname, true, c)
// }
// }
// func (self *CoordinatorSuite) TestCanCreateCoordinatorsAndReplicate(c *C) {
// servers := startAndVerifyCluster(2, c)
// defer clean(servers...)
// err := servers[0].CreateDatabase("db2", uint8(1))
// c.Assert(err, IsNil)
// time.Sleep(REPLICATION_LAG)
// assertConfigContains(servers[0].port, "db2", true, c)
// assertConfigContains(servers[1].port, "db2", true, c)
// }
// func (self *CoordinatorSuite) TestDoWriteOperationsFromNonLeaderServer(c *C) {
// servers := startAndVerifyCluster(2, c)
// err := servers[1].CreateDatabase("db3", uint8(1))
// c.Assert(err, Equals, nil)
// time.Sleep(REPLICATION_LAG)
// assertConfigContains(servers[0].port, "db3", true, c)
// assertConfigContains(servers[1].port, "db3", true, c)
// }
// func (self *CoordinatorSuite) TestNewServerJoiningClusterWillPickUpData(c *C) {
// server := startAndVerifyCluster(1, c)[0]
// defer clean(server)
// server.CreateDatabase("db4", uint8(1))
// assertConfigContains(server.port, "db4", true, c)
// server2 := newConfigAndServer(c)
// defer clean(server2)
// l, err := net.Listen("tcp4", ":0")
// c.Assert(err, IsNil)
// server2.config.SeedServers = []string{fmt.Sprintf("http://localhost:%d", server.port)}
// server2.Serve(l)
// assertConfigContains(server2.port, "db4", true, c)
// }
// func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) {
// servers := startAndVerifyCluster(3, c)
// defer clean(servers...)
// err := servers[0].CreateDatabase("db5", uint8(1))
// c.Assert(err, Equals, nil)
// time.Sleep(REPLICATION_LAG)
// assertConfigContains(servers[0].port, "db5", true, c)
// assertConfigContains(servers[1].port, "db5", true, c)
// assertConfigContains(servers[2].port, "db5", true, c)
// leader, _ := servers[1].leaderConnectString()
// c.Assert(leader, Equals, fmt.Sprintf("http://localhost:%d", servers[0].port))
// // kill the leader
// clean(servers[0])
// // make sure an election will start
// time.Sleep(3 * time.Second)
// leader, _ = servers[1].leaderConnectString()
// c.Assert(leader, Not(Equals), fmt.Sprintf("http://localhost:%d", servers[0].port))
// err = servers[1].CreateDatabase("db6", uint8(1))
// c.Assert(err, Equals, nil)
// time.Sleep(REPLICATION_LAG)
// assertConfigContains(servers[1].port, "db6", true, c)
// assertConfigContains(servers[2].port, "db6", true, c)
// }
// func (self *CoordinatorSuite) TestAutomaticDbCreations(c *C) {
// servers := startAndVerifyCluster(3, c)
// defer clean(servers...)
// coordinator := NewCoordinatorImpl(DEFAULT_CONFIGURATION, servers[0], servers[0].clusterConfig)
// time.Sleep(REPLICATION_LAG)
// // Root user is created
// var root User
// var err error
// // we should have the root user
// root, err = coordinator.AuthenticateClusterAdmin("root", "root")
// c.Assert(err, IsNil)
// c.Assert(root.IsClusterAdmin(), Equals, true)
// // can create db users
// c.Assert(coordinator.CreateDbUser(root, "db1", "db_user", "pass"), IsNil)
// time.Sleep(REPLICATION_LAG)
// // the db should be in the index now
// for _, server := range servers {
// coordinator := NewCoordinatorImpl(DEFAULT_CONFIGURATION, server, server.clusterConfig)
// dbs, err := coordinator.ListDatabases(root)
// c.Assert(err, IsNil)
// c.Assert(dbs, DeepEquals, []*cluster.Database{&cluster.Database{"db1", 1}})
// }
// // if the db is dropped it should remove the users as well
// c.Assert(servers[0].DropDatabase("db1"), IsNil)
// _, err = coordinator.AuthenticateDbUser("db1", "db_user", "pass")
// c.Assert(err, ErrorMatches, ".*Invalid.*")
// }
// func (self *CoordinatorSuite) TestAdminOperations(c *C) {
// servers := startAndVerifyCluster(3, c)
// defer clean(servers...)
// coordinator := NewCoordinatorImpl(DEFAULT_CONFIGURATION, servers[0], servers[0].clusterConfig)
// time.Sleep(REPLICATION_LAG)
// // Root user is created
// var root User
// var err error
// // we should have the root user
// root, err = coordinator.AuthenticateClusterAdmin("root", "root")
// c.Assert(err, IsNil)
// c.Assert(root.IsClusterAdmin(), Equals, true)
// c.Assert(root.HasWriteAccess("foobar"), Equals, true)
// c.Assert(root.HasReadAccess("foobar"), Equals, true)
// // Can change it's own password
// c.Assert(coordinator.ChangeClusterAdminPassword(root, "root", "password"), Equals, nil)
// root, err = coordinator.AuthenticateClusterAdmin("root", "password")
// c.Assert(err, IsNil)
// c.Assert(root.IsClusterAdmin(), Equals, true)
// // Can create other cluster admin
// c.Assert(coordinator.CreateClusterAdminUser(root, "another_cluster_admin", "pass"), IsNil)
// u, err := coordinator.AuthenticateClusterAdmin("another_cluster_admin", "pass")
// c.Assert(err, IsNil)
// c.Assert(u.IsClusterAdmin(), Equals, true)
// // can get other cluster admin
// admins, err := coordinator.ListClusterAdmins(root)
// c.Assert(err, IsNil)
// sort.Strings(admins)
// c.Assert(admins, DeepEquals, []string{"another_cluster_admin", "root"})
// // can create db users
// c.Assert(coordinator.CreateDbUser(root, "db1", "db_user", "db_pass"), IsNil)
// u, err = coordinator.AuthenticateDbUser("db1", "db_user", "db_pass")
// c.Assert(err, IsNil)
// c.Assert(u.IsClusterAdmin(), Equals, false)
// c.Assert(u.IsDbAdmin("db1"), Equals, false)
// // can get properties of db users
// dbUser, err := coordinator.GetDbUser(root, "db1", "db_user")
// c.Assert(err, IsNil)
// c.Assert(dbUser, NotNil)
// c.Assert(dbUser.GetName(), Equals, "db_user")
// c.Assert(dbUser.IsDbAdmin("db1"), Equals, false)
// dbUser, err = coordinator.GetDbUser(root, "db1", "invalid_user")
// c.Assert(err, NotNil)
// c.Assert(err, ErrorMatches, "Invalid username invalid_user")
// // can make db users db admins
// c.Assert(coordinator.SetDbAdmin(root, "db1", "db_user", true), IsNil)
// u, err = coordinator.AuthenticateDbUser("db1", "db_user", "db_pass")
// c.Assert(err, IsNil)
// c.Assert(u.IsDbAdmin("db1"), Equals, true)
// // can list db users
// dbUsers, err := coordinator.ListDbUsers(root, "db1")
// c.Assert(err, IsNil)
// c.Assert(dbUsers, HasLen, 1)
// c.Assert(dbUsers[0].GetName(), Equals, "db_user")
// c.Assert(dbUsers[0].IsDbAdmin("db1"), Equals, true)
// // can delete cluster admins and db users
// c.Assert(coordinator.DeleteDbUser(root, "db1", "db_user"), IsNil)
// c.Assert(coordinator.DeleteClusterAdminUser(root, "another_cluster_admin"), IsNil)
// }
// func (self *CoordinatorSuite) TestContinuousQueryOperations(c *C) {
// servers := startAndVerifyCluster(3, c)
// defer clean(servers...)
// coordinator := NewCoordinatorImpl(DEFAULT_CONFIGURATION, servers[0], servers[0].clusterConfig)
// time.Sleep(REPLICATION_LAG)
// // create users
// root, _ := coordinator.AuthenticateClusterAdmin("root", "root")
// coordinator.CreateDbUser(root, "db1", "db_admin", "db_pass")
// coordinator.SetDbAdmin(root, "db1", "db_admin", true)
// dbAdmin, _ := coordinator.AuthenticateDbUser("db1", "db_admin", "db_pass")
// coordinator.CreateDbUser(root, "db1", "db_user", "db_pass")
// dbUser, _ := coordinator.AuthenticateDbUser("db1", "db_user", "db_pass")
// allowedUsers := []*User{&root, &dbAdmin}
// disallowedUsers := []*User{&dbUser}
// // make sure that invalid continuous queries throw an error
// response := coordinator.CreateContinuousQuery(root, "db1", "select * from foo group by blah into bar;")
// c.Assert(response, ErrorMatches, "^Continuous queries with a group by clause must include .*")
// // cluster admins and db admins should be able to do everything
// for _, user := range allowedUsers {
// results, err := coordinator.ListContinuousQueries(*user, "db1")
// c.Assert(err, IsNil)
// c.Assert(results[0].Points, HasLen, 0)
// c.Assert(coordinator.CreateContinuousQuery(*user, "db1", "select * from foo into bar;"), IsNil)
// time.Sleep(REPLICATION_LAG)
// results, err = coordinator.ListContinuousQueries(*user, "db1")
// c.Assert(err, IsNil)
// c.Assert(results[0].Points, HasLen, 1)
// c.Assert(*results[0].Points[0].Values[0].Int64Value, Equals, int64(1))
// c.Assert(*results[0].Points[0].Values[1].StringValue, Equals, "select * from foo into bar;")
// c.Assert(coordinator.DeleteContinuousQuery(*user, "db1", 1), IsNil)
// results, err = coordinator.ListContinuousQueries(*user, "db1")
// c.Assert(err, IsNil)
// c.Assert(results[0].Points, HasLen, 0)
// }
// // regular database users shouldn't be able to do anything
// for _, user := range disallowedUsers {
// _, err := coordinator.ListContinuousQueries(*user, "db1")
// c.Assert(err, NotNil)
// c.Assert(coordinator.CreateContinuousQuery(*user, "db1", "select * from foo into bar;"), NotNil)
// c.Assert(coordinator.DeleteContinuousQuery(*user, "db1", 1), NotNil)
// }
// coordinator.DeleteDbUser(root, "db1", "db_admin")
// coordinator.DeleteDbUser(root, "db1", "db_user")
// }
// func (self *CoordinatorSuite) TestDbAdminOperations(c *C) {
// servers := startAndVerifyCluster(3, c)
// defer clean(servers...)
// coordinator := NewCoordinatorImpl(DEFAULT_CONFIGURATION, servers[0], servers[0].clusterConfig)
// time.Sleep(REPLICATION_LAG)
// // create a db user
// root, err := coordinator.AuthenticateClusterAdmin("root", "root")
// c.Assert(err, IsNil)
// c.Assert(root.IsClusterAdmin(), Equals, true)
// c.Assert(coordinator.CreateDbUser(root, "db1", "db_user", "db_pass"), IsNil)
// c.Assert(coordinator.SetDbAdmin(root, "db1", "db_user", true), IsNil)
// dbUser, err := coordinator.AuthenticateDbUser("db1", "db_user", "db_pass")
// c.Assert(err, IsNil)
// // Cannot create or delete other cluster admin
// c.Assert(coordinator.CreateClusterAdminUser(dbUser, "another_cluster_admin", "somepassword"), NotNil)
// c.Assert(coordinator.DeleteClusterAdminUser(dbUser, "root"), NotNil)
// // cannot get cluster admin
// _, err = coordinator.ListClusterAdmins(dbUser)
// c.Assert(err, NotNil)
// // can create db users
// c.Assert(coordinator.CreateDbUser(dbUser, "db1", "db_user2", "db_pass"), IsNil)
// u, err := coordinator.AuthenticateDbUser("db1", "db_user2", "db_pass")
// c.Assert(err, IsNil)
// c.Assert(u.IsClusterAdmin(), Equals, false)
// c.Assert(u.IsDbAdmin("db1"), Equals, false)
// // can get db users
// admins, err := coordinator.ListDbUsers(dbUser, "db1")
// c.Assert(err, IsNil)
// c.Assert(admins[0].GetName(), Equals, "db_user")
// c.Assert(admins[0].IsDbAdmin("db1"), Equals, true)
// c.Assert(admins[1].GetName(), Equals, "db_user2")
// c.Assert(admins[1].IsDbAdmin("db1"), Equals, false)
// // cannot create db users for a different db
// c.Assert(coordinator.CreateDbUser(dbUser, "db2", "db_user", "somepassword"), NotNil)
// // cannot get db users for a different db
// _, err = coordinator.ListDbUsers(dbUser, "db2")
// c.Assert(err, NotNil)
// // can make db users db admins
// c.Assert(coordinator.SetDbAdmin(dbUser, "db1", "db_user2", true), IsNil)
// u, err = coordinator.AuthenticateDbUser("db1", "db_user2", "db_pass")
// c.Assert(err, IsNil)
// c.Assert(u.IsDbAdmin("db1"), Equals, true)
// // can delete db users
// c.Assert(coordinator.DeleteDbUser(dbUser, "db1", "db_user2"), IsNil)
// }
// func (self *CoordinatorSuite) TestDbUserOperations(c *C) {
// servers := startAndVerifyCluster(3, c)
// defer clean(servers...)
// coordinator := NewCoordinatorImpl(DEFAULT_CONFIGURATION, servers[0], servers[0].clusterConfig)
// time.Sleep(REPLICATION_LAG)
// // create a db user
// root, err := coordinator.AuthenticateClusterAdmin("root", "root")
// c.Assert(err, IsNil)
// c.Assert(root.IsClusterAdmin(), Equals, true)
// c.Assert(coordinator.CreateDbUser(root, "db1", "db_user", "db_pass"), IsNil)
// dbUser, err := coordinator.AuthenticateDbUser("db1", "db_user", "db_pass")
// c.Assert(err, IsNil)
// // Cannot create other cluster admin
// c.Assert(coordinator.CreateClusterAdminUser(dbUser, "another_cluster_admin", "somepass"), NotNil)
// // can create db users
// c.Assert(coordinator.CreateDbUser(dbUser, "db1", "db_user2", "somepass"), NotNil)
// // cannot make itself an admin
// c.Assert(coordinator.SetDbAdmin(dbUser, "db1", "db_user", true), NotNil)
// // cannot create db users for a different db
// c.Assert(coordinator.CreateDbUser(dbUser, "db2", "db_user", "somepass"), NotNil)
// // can change its own password
// c.Assert(coordinator.ChangeDbUserPassword(dbUser, "db1", "db_user", "new_password"), IsNil)
// dbUser, err = coordinator.AuthenticateDbUser("db1", "db_user", "db_pass")
// c.Assert(err, NotNil)
// dbUser, err = coordinator.AuthenticateDbUser("db1", "db_user", "new_password")
// c.Assert(err, IsNil)
// }
// func (self *CoordinatorSuite) TestUserDataReplication(c *C) {
// servers := startAndVerifyCluster(3, c)
// defer clean(servers...)
// coordinators := make([]*CoordinatorImpl, 0, len(servers))
// for _, server := range servers {
// coordinators = append(coordinators, NewCoordinatorImpl(DEFAULT_CONFIGURATION, server, server.clusterConfig))
// }
// // root must exist on all three nodes
// var root User
// var err error
// for _, coordinator := range coordinators {
// root, err = coordinator.AuthenticateClusterAdmin("root", "root")
// c.Assert(err, IsNil)
// c.Assert(root.IsClusterAdmin(), Equals, true)
// }
// c.Assert(coordinators[0].CreateClusterAdminUser(root, "admin", "admin"), IsNil)
// time.Sleep(REPLICATION_LAG)
// for _, coordinator := range coordinators {
// u, err := coordinator.AuthenticateClusterAdmin("admin", "admin")
// c.Assert(err, IsNil)
// c.Assert(u.IsClusterAdmin(), Equals, true)
// }
// }
// func (self *CoordinatorSuite) createDatabases(servers []*RaftServer, c *C) {
// err := servers[0].CreateDatabase("db1", 0)
// c.Assert(err, IsNil)
// err = servers[1].CreateDatabase("db2", 1)
// c.Assert(err, IsNil)
// err = servers[2].CreateDatabase("db3", 3)
// c.Assert(err, IsNil)
// }
// func (self *CoordinatorSuite) TestCanCreateDatabaseWithNameAndReplicationFactor(c *C) {
// servers := startAndVerifyCluster(3, c)
// defer clean(servers...)
// self.createDatabases(servers, c)
// time.Sleep(REPLICATION_LAG)
// for i := 0; i < 3; i++ {
// databases := servers[i].clusterConfig.DatabaseReplicationFactors
// c.Assert(databases, DeepEquals, map[string]uint8{
// "db1": 1,
// "db2": 1,
// "db3": 3,
// })
// }
// err := servers[0].CreateDatabase("db3", 1)
// c.Assert(err, ErrorMatches, ".*db3 exists.*")
// err = servers[2].CreateDatabase("db3", 1)
// c.Assert(err, ErrorMatches, ".*db3 exists.*")
// }
// func (self *CoordinatorSuite) TestCanDropDatabaseWithName(c *C) {
// servers := startAndVerifyCluster(3, c)
// defer clean(servers...)
// self.createDatabases(servers, c)
// err := servers[0].DropDatabase("db1")
// c.Assert(err, IsNil)
// err = servers[1].DropDatabase("db2")
// c.Assert(err, IsNil)
// err = servers[2].DropDatabase("db3")
// c.Assert(err, IsNil)
// time.Sleep(REPLICATION_LAG)
// for i := 0; i < 3; i++ {
// databases := servers[i].clusterConfig.GetDatabases()
// c.Assert(databases, HasLen, 0)
// }
// err = servers[0].DropDatabase("db3")
// c.Assert(err, ErrorMatches, ".*db3 doesn't exist.*")
// err = servers[2].DropDatabase("db3")
// c.Assert(err, ErrorMatches, ".*db3 doesn't exist.*")
// }
// func (self *CoordinatorSuite) TestCheckReadAccess(c *C) {
// coordinator := NewCoordinatorImpl(DEFAULT_CONFIGURATION, nil, nil)
// mock := `{
// "points": [
// {
// "values": [
// {
// "int64_value": 3
// }
// ],
// "sequence_number": 1,
// "timestamp": 23423
// }
// ],
// "name": "foo",
// "fields": ["value"]
// }`
// series := stringToSeries(mock, c)
// user := &MockUser{
// dbCannotWrite: map[string]bool{"foo": true},
// }
// err := coordinator.WriteSeriesData(user, "foo", []*protocol.Series{series})
// c.Assert(err, ErrorMatches, ".*Insufficient permission.*")
// }
// func (self *CoordinatorSuite) TestServersGetUniqueIdsAndCanActivateCluster(c *C) {
// servers := startAndVerifyCluster(3, c)
// defer clean(servers...)
// // ensure they're all in the same order across the cluster
// expectedServers := servers[0].clusterConfig.Servers()
// for _, server := range servers {
// c.Assert(server.clusterConfig.Servers(), HasLen, len(expectedServers))
// for i, clusterServer := range expectedServers {
// c.Assert(server.clusterConfig.Servers()[i].Id, Equals, clusterServer.Id)
// }
// }
// // ensure cluster server ids are unique
// idMap := make(map[uint32]bool)
// for _, clusterServer := range servers[0].clusterConfig.Servers() {
// _, ok := idMap[clusterServer.Id]
// c.Assert(ok, Equals, false)
// idMap[clusterServer.Id] = true
// }
// }
// func (self *CoordinatorSuite) TestCanJoinAClusterWhenNotInitiallyPointedAtLeader(c *C) {
// servers := startAndVerifyCluster(2, c)
// defer clean(servers...)
// newServer := newConfigAndServer(c)
// defer clean(newServer)
// l, err := net.Listen("tcp4", ":0")
// c.Assert(err, IsNil)
// leaderAddr, ok := servers[1].leaderConnectString()
// c.Assert(ok, Equals, true)
// leaderPort, _ := strconv.Atoi(strings.Split(leaderAddr, ":")[2])
// followerPort := servers[1].port
// if leaderPort == servers[1].port {
// followerPort = servers[0].port
// }
// newServer.config.SeedServers = []string{fmt.Sprintf("http://localhost:%d", followerPort)}
// newServer.Serve(l)
// err = servers[0].CreateDatabase("db8", uint8(1))
// c.Assert(err, Equals, nil)
// time.Sleep(REPLICATION_LAG)
// assertConfigContains(newServer.port, "db8", true, c)
// }

View File

@ -1,79 +0,0 @@
package integration
import . ""
type EngineSuite struct {
var _ = Suite(&EngineSuite{})
// func (self *EngineSuite) TestCountQueryWithGroupByTimeInvalidArgument(c *C) {
// err := common.NewQueryError(common.InvalidArgument, "invalid argument foobar to the time function")
// self.createEngine(c, `[]`)
// runQueryRunError(engine, "select count(*) from foo group by time(foobar) order asc", c, err)
// }
// func (self *EngineSuite) TestPercentileQueryWithInvalidNumberOfArguments(c *C) {
// err := common.NewQueryError(common.WrongNumberOfArguments, "function percentile() requires exactly two arguments")
// self.createEngine(c, `[]`)
// runQueryRunError(engine, "select percentile(95) from foo group by time(1m) order asc", c, err)
// }
// func (self *EngineSuite) TestPercentileQueryWithNonNumericArguments(c *C) {
// err := common.NewQueryError(common.InvalidArgument, "function percentile() requires a numeric second argument between 0 and 100")
// self.createEngine(c, `[]`)
// runQueryRunError(engine, "select percentile(column_one, a95) from foo group by time(1m) order asc", c, err)
// }
// func (self *EngineSuite) TestPercentileQueryWithOutOfBoundNumericArguments(c *C) {
// err := common.NewQueryError(common.InvalidArgument, "function percentile() requires a numeric second argument between 0 and 100")
// self.createEngine(c, `[]`)
// runQueryRunError(engine, "select percentile(column_one, 0) from foo group by time(1m) order asc", c, err)
// runQueryRunError(engine, "select percentile(column_one, 105) from foo group by time(1m) order asc", c, err)
// }
// func (self *DataTestSuite) BasicQueryError(c *C) {
// // create an engine and assert the engine works as a passthrough if
// // the query only returns the raw data
// engine := createEngine(c, "[]")
// engine.coordinator.(*MockCoordinator).returnedError = fmt.Errorf("some error")
// err := engine.coordinator.RunQuery(nil, "", "select * from foo", func(series *protocol.Series) error {
// return nil
// })
// c.Assert(err, ErrorMatches, "some error")
// }
// func (self *DataTestSuite) CountQueryWithGroupByTimeInvalidNumberOfArguments(c *C) (Fun, Fun) {
// err := common.NewQueryError(common.WrongNumberOfArguments, "time function only accepts one argument")
// createEngine(client, c, `[]`)
// runQueryRunError(engine, "select count(*) from foo group by time(1h, 1m) order asc", c, err)
// }
// func (self *DataTestSuite) CountQueryWithInvalidWildcardArgument(c *C) (Fun, Fun) {
// return func(client Client) {
// createEngine(client, c, `
// [
// {
// "points": [
// {
// "values": [
// {
// "int64_value": 100
// }
// ],
// "timestamp": 1381346641000000
// }
// ],
// "name": "foo",
// "fields": ["column_one"]
// }
// ]
// `)
// }, func(client Client) {
// query := "select count(*) from foo group by time(1h) order asc"
// body, code := self.server.GetErrorBody("test_db", query, "root", "root", false, c)
// c.Assert(code, Equals, http.StatusBadRequest)
// c.Assert(body, Matches, ".*count.*")
// }
// }

View File

@ -1,110 +0,0 @@
package integration
This is commented out because we can't generate old data automatically. The files for this test
are up on S3 so that we can run it later. Just trust that I've run it (this is Paul)
// import (
// "fmt"
// "io/ioutil"
// "net/http"
// "os"
// "path/filepath"
// "time"
// ""
// ""
// . ""
// . ""
// )
// type MigrationTestSuite struct {
// server *Server
// }
// var _ = Suite(&MigrationTestSuite{})
// func (self *MigrationTestSuite) setup(file string, c *C) {
// self.server = NewServer(fmt.Sprintf("integration/%s", file), c)
// }
// func (self *MigrationTestSuite) teardown(dir string, c *C) {
// self.server.Stop()
// dataDir := fmt.Sprintf("./%s/data", "migration_data")
// shardDir := filepath.Join(dataDir, migration.OLD_SHARD_DIR)
// infos, err := ioutil.ReadDir(shardDir)
// if err != nil {
// fmt.Printf("Error Clearing Migration: ", err)
// return
// }
// for _, info := range infos {
// if info.IsDir() {
// err := os.Remove(filepath.Join(shardDir, info.Name(), migration.MIGRATED_MARKER))
// if err != nil {
// fmt.Printf("Error Clearing Migration: ", err)
// }
// }
// }
// err = os.RemoveAll(filepath.Join(dataDir, datastore.SHARD_DATABASE_DIR))
// if err != nil {
// fmt.Printf("Error Clearing Migration: ", err)
// }
// }
// func (self *MigrationTestSuite) TestMigrationOfPreviousDb(c *C) {
// self.setup("migration_test.toml", c)
// defer self.teardown("migration_data", c)
// _, err := http.Post("http://localhost:8086/cluster/migrate_data?u=root&p=root", "application/json", nil)
// c.Assert(err, IsNil)
// // make sure that it won't kick it off a second time while it's already running
// resp, _ := http.Post("http://localhost:8086/cluster/migrate_data?u=root&p=root", "application/json", nil)
// c.Assert(resp.StatusCode, Equals, http.StatusForbidden)
// time.Sleep(time.Second * 60)
// client := self.server.GetClient("test1", c)
// s, err := client.Query("select count(value) from cpu_idle")
// c.Assert(err, IsNil)
// c.Assert(s, HasLen, 1)
// c.Assert(s[0].Points, HasLen, 1)
// c.Assert(s[0].Points[0][1].(float64), Equals, float64(44434))
// s, err = client.Query("select count(type) from customer_events")
// c.Assert(err, IsNil)
// c.Assert(s, HasLen, 1)
// c.Assert(s[0].Points, HasLen, 1)
// c.Assert(s[0].Points[0][1].(float64), Equals, float64(162597))
// client = self.server.GetClient("test2", c)
// s, err = client.Query("list series")
// c.Assert(err, IsNil)
// c.Assert(s, HasLen, 1)
// c.Assert(s[0].Points, HasLen, 1000)
// s, err = client.Query("select count(value) from /.*/")
// c.Assert(s, HasLen, 1000)
// for _, series := range s {
// c.Assert(series.Points, HasLen, 1)
// c.Assert(series.Points[0][1].(float64), Equals, float64(1434))
// }
// _, err = http.Post("http://localhost:8086/cluster/migrate_data?u=root&p=root", "application/json", nil)
// c.Assert(err, IsNil)
// time.Sleep(time.Second * 5)
// }
// func (self *MigrationTestSuite) TestDoesntPanicOnPreviousSnapshot(c *C) {
// self.setup("migration_test2.toml", c)
// defer self.teardown("migration_data2", c)
// _, err := http.Post("http://localhost:8086/cluster/migrate_data?u=root&p=root", "application/json", nil)
// c.Assert(err, IsNil)
// time.Sleep(time.Second * 5)
// client := self.server.GetClient("test", c)
// s, err := client.Query("select count(value) from foo")
// c.Assert(err, IsNil)
// c.Assert(s, HasLen, 1)
// c.Assert(s[0].Points, HasLen, 1)
// c.Assert(s[0].Points[0][1].(float64), Equals, float64(1))
// }

View File

@ -736,18 +736,18 @@ type UnsubscribeCommand struct {
type MessageType uint16
const (
ConfigMessageType = 1 << 15
BrokerMessageType = 1 << 15
const (
CreateTopicMessageType = ConfigMessageType | MessageType(0x00)
DeleteTopicMessageType = ConfigMessageType | MessageType(0x01)
CreateTopicMessageType = BrokerMessageType | MessageType(0x00)
DeleteTopicMessageType = BrokerMessageType | MessageType(0x01)
CreateReplicaMessageType = ConfigMessageType | MessageType(0x10)
DeleteReplicaMessageType = ConfigMessageType | MessageType(0x11)
CreateReplicaMessageType = BrokerMessageType | MessageType(0x10)
DeleteReplicaMessageType = BrokerMessageType | MessageType(0x11)
SubscribeMessageType = ConfigMessageType | MessageType(0x20)
UnsubscribeMessageType = ConfigMessageType | MessageType(0x21)
SubscribeMessageType = BrokerMessageType | MessageType(0x20)
UnsubscribeMessageType = BrokerMessageType | MessageType(0x21)
// The size of the encoded message header, in bytes.

View File

@ -1,10 +1,13 @@
package messaging
import (
@ -53,6 +56,16 @@ func (c *Client) URLs() []*url.URL {
return c.urls
// LeaderURL returns the URL of the broker leader.
func (c *Client) LeaderURL() *url.URL {
// TODO(benbjohnson): Actually keep track of the leader.
// HACK(benbjohnson): For testing, just grab a url.
return c.urls[0]
// Open initializes and opens the connection to the broker cluster.
func (c *Client) Open(urls []*url.URL) error {
@ -108,6 +121,31 @@ func (c *Client) Close() error {
return nil
// Publish sends a message to the broker and returns an index or error.
func (c *Client) Publish(typ MessageType, data []byte) (uint64, error) {
// Send the message to the messages endpoint.
u := c.LeaderURL()
u.RawQuery = url.Values{"type": {strconv.Itoa(int(typ))}}.Encode()
resp, err := http.Post(u.String(), "application/octet-stream", bytes.NewReader(data))
if err != nil {
return 0, err
defer func() { _ = resp.Body.Close() }()
// If a non-200 status is returned then an error occurred.
if resp.StatusCode != http.StatusOK {
return 0, errors.New(resp.Header.Get("X-Broker-Error"))
// Parse broker index.
index, err := strconv.ParseUint(resp.Header.Get("X-Broker-Index"), 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid index: %s", err)
return index, nil
// streamer connects to a broker server and streams the replica's messages.
func (c *Client) streamer(done chan chan struct{}) {
for {

View File

@ -24,6 +24,9 @@ var (
// ErrReplicaNotFound is returned when referencing a replica that doesn't exist.
ErrReplicaNotFound = errors.New("replica not found")
// ErrReplicaNameRequired is returned when finding a replica without a name.
ErrReplicaNameRequired = errors.New("replica name required")
// errReplicaUnavailable is returned when writing bytes to a replica when
// there is no writer attached to the replica.
errReplicaUnavailable = errors.New("replica unavailable")
@ -36,4 +39,10 @@ var (
// ErrBrokerURLRequired is returned when opening a broker without URLs.
ErrBrokerURLRequired = errors.New("broker url required")
// ErrMessageTypeRequired is returned publishing a message without a type.
ErrMessageTypeRequired = errors.New("message type required")
// ErrTopicRequired is returned publishing a message without a topic ID.
ErrTopicRequired = errors.New("topic required")

View File

@ -1,7 +1,9 @@
package messaging
import (
@ -32,25 +34,34 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Route all InfluxDB broker requests.
switch r.URL.Path {
case "/stream":
h.serveStream(w, r)
if r.Method == "GET" {, r)
} else {
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
case "/messages":
if r.Method == "POST" {
h.publish(w, r)
} else {
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
// connects the requestor as the replica's writer.
func (h *Handler) serveStream(w http.ResponseWriter, r *http.Request) {
func (h *Handler) stream(w http.ResponseWriter, r *http.Request) {
// Retrieve the replica name.
name := r.URL.Query().Get("name")
if name == "" {
w.Header().Set("X-Broker-Error", "replica name required")
http.Error(w, "replica name required", http.StatusBadRequest)
h.error(w, ErrReplicaNameRequired, http.StatusBadRequest)
// Find the replica on the broker.
replica :=
if replica == nil {
w.Header().Set("X-Broker-Error", ErrReplicaNotFound.Error())
http.Error(w, ErrReplicaNotFound.Error(), http.StatusNotFound)
h.error(w, ErrReplicaNotFound, http.StatusNotFound)
@ -58,3 +69,49 @@ func (h *Handler) serveStream(w http.ResponseWriter, r *http.Request) {
// This will block until the replica is closed or a new writer connects.
_, _ = replica.WriteTo(w)
// publishes a message to the broker.
func (h *Handler) publish(w http.ResponseWriter, r *http.Request) {
m := &Message{}
// Read the message type.
if n, err := strconv.ParseUint(r.URL.Query().Get("type"), 10, 16); err != nil {
h.error(w, ErrMessageTypeRequired, http.StatusBadRequest)
} else {
m.Type = MessageType(n)
// Read the topic ID.
if n, err := strconv.ParseUint(r.URL.Query().Get("topicID"), 10, 32); err != nil {
h.error(w, ErrTopicRequired, http.StatusBadRequest)
} else {
m.TopicID = uint32(n)
// Read the request body.
data, err := ioutil.ReadAll(r.Body)
if err != nil {
h.error(w, err, http.StatusInternalServerError)
m.Data = data
// Publish message to the broker.
index, err :=
if err != nil {
h.error(w, err, http.StatusInternalServerError)
// Return index.
w.Header().Set("X-Broker-Index", strconv.FormatUint(index, 10))
// error writes an error to the client and sets the status code.
func (h *Handler) error(w http.ResponseWriter, err error, code int) {
s := err.Error()
w.Header().Set("X-Broker-Error", s)
http.Error(w, s, code)

View File

@ -1,6 +1,7 @@
package influxdb
import (
@ -17,6 +18,8 @@ type Server struct {
path string
client MessagingClient
databases map[string]*Database
done chan struct{}
// NewServer returns a new instance of Server.
@ -35,12 +38,17 @@ func (s *Server) Path() string { return s.path }
// Open initializes the server from a given path.
func (s *Server) Open(path string) error {
// Ensure the server isn't already open and there's a path provided.
if s.opened() {
return ErrServerOpen
} else if path == "" {
return ErrPathRequired
// Start goroutine to read messages from the broker.
s.done = make(chan struct{}, 0)
go s.processor(s.done)
return nil
@ -49,16 +57,115 @@ func (s *Server) opened() bool { return s.path != "" }
// Close shuts down the server.
func (s *Server) Close() error {
// TODO: Close metadata.
if !s.opened() {
return ErrServerClosed
// Close notification.
s.done = nil
// Remove path.
s.path = ""
return nil
// publish encodes a message as JSON and send it to the broker.
// This function does not wait for the message to be processed.
// Returns the broker log index of the message or an error.
func (s *Server) publish(typ *messaging.MessageType, c interface{}) (uint64, error) {
// Encode the command.
b, err := json.Marshal(c)
if err != nil {
return 0, err
// Publish the message.
return s.client.Publish(typ, b)
// publishSync encodes a message as JSON and send it to the broker.
// This function will wait until the message is processed before returning.
// Returns an error if the message could not be successfully published.
func (s *Server) publishSync(typ *messaging.MessageType, c interface{}) error {
// Publish the message.
index, err := s.publish(typ, c)
if err != nil {
return err
// Wait for the message to be processed.
if err := s.client.wait(index); err != nil {
return err
return nil
// CreateDatabase creates a new database.
func (s *Server) CreateDatabase(name string) error {
if s.databases[name] != nil {
return ErrDatabaseExists
return s.publishSync(createDatabaseMessageType, &createDatabaseCommand{Name: name})
func (s *Server) applyCreateDatabase(c *createDatabaseCommand) {
if s.databases[c.Name] != nil {
// Create database entry.
s.databases[c.Name] = &Database{Name: c.Name}
// WriteSeries writes series data to the broker.
func (s *Server) WriteSeries(u *ClusterAdmin, database string, series *protocol.Series) error {
// TODO:
return nil
// processor runs in a separate goroutine and processes all incoming broker messages.
func (s *Server) processor(done chan struct{}) {
client := s.client
for {
// Read incoming message.
var m *messaging.Message
select {
case <-done:
case m = <-client.C():
// Process message.
switch m.Type {
case createDatabaseMessageType:
var c createDatabaseCommand
mustUnmarshal(m.Data, &c)
b.applyCreateDatabase(c.ID, c.Name)
const (
createDatabaseMessageType = messaging.MessageType(0x00)
deleteDatabaseMessageType = messaging.MessageType(0x01)
// createDatabaseCommand creates a new database.
type createDatabaseCommand struct {
Name string `json:"name"`
// MessagingClient represents the client used to receive messages from brokers.
type MessagingClient interface {
C() <-chan *messaging.Message
@ -156,6 +263,26 @@ func compileRegex(s string) (*regexp.Regexp, error) {
return regexp.Compile(s)
// mustMarshal encodes a value to JSON.
// This will panic if an error occurs. This should only be used internally when
// an invalid marshal will cause corruption and a panic is appropriate.
func mustMarshal(v interface{}) []byte {
b, err := json.Marshal(v)
if err != nil {
panic("marshal: " + err.Error())
return b
// mustUnmarshal decodes a value from JSON.
// This will panic if an error occurs. This should only be used internally when
// an invalid unmarshal will cause corruption and a panic is appropriate.
func mustUnmarshal(b []byte, v interface{}) {
if err := json.Unmarshal(b, v); err != nil {
panic("unmarshal: " + err.Error())
// assert will panic with a given formatted message if the given condition is false.
func assert(condition bool, msg string, v ...interface{}) {
if !condition {

View File

@ -14,10 +14,13 @@ import (
func TestServer_Open(t *testing.T) {
c := NewMessagingClient()
s := NewServer(c)
defer s.Close()
if err := s.Server.Open(tempfile()); err != nil {
if err := s.Server.Close(); err != nil {
// Server is a wrapping test struct for influxdb.Server.