Merge pull request #2128 from influxdb/broker-discovery

Broker discovery
pull/2153/head
Jason Wilder 2015-04-02 12:28:35 -06:00
commit ea7fe9b3d5
15 changed files with 330 additions and 182 deletions

View File

@ -1,8 +1,11 @@
## v0.9.0-rc20 [unreleased]
### Features
- [#2128](https://github.com/influxdb/influxdb/pull/2128): Data node discovery from brokers
### Bugfixes
- [#2147](https://github.com/influxdb/influxdb/pull/2147): Set Go Max procs in a better location
-
-
## v0.9.0-rc19 [2015-04-01]
### Features

View File

@ -16,6 +16,10 @@ import (
// Ensure the restore command can expand a snapshot and bootstrap a broker.
func TestRestoreCommand(t *testing.T) {
if testing.Short() {
t.Skip("skipping TestRestoreCommand")
}
now := time.Now()
// Create root path to server.
@ -51,7 +55,7 @@ func TestRestoreCommand(t *testing.T) {
}
// Start server.
b, s := main.Run(c, "", "x.x")
b, s, l := main.Run(c, "", "x.x")
if b == nil {
t.Fatal("cannot run broker")
} else if s == nil {
@ -84,6 +88,7 @@ func TestRestoreCommand(t *testing.T) {
f.Close()
// Stop server.
l.Close()
s.Close()
b.Close()
@ -105,7 +110,7 @@ func TestRestoreCommand(t *testing.T) {
}
// Restart server.
b, s = main.Run(c, "", "x.x")
b, s, l = main.Run(c, "", "x.x")
if b == nil {
t.Fatal("cannot run broker")
} else if s == nil {
@ -135,6 +140,11 @@ func TestRestoreCommand(t *testing.T) {
} else if !reflect.DeepEqual(v, map[string]interface{}{"value": float64(1000)}) {
t.Fatalf("read series(1) mismatch: %#v", v)
}
// Stop server.
l.Close()
s.Close()
b.Close()
}
// RestoreCommand is a test wrapper for main.RestoreCommand.

View File

@ -23,7 +23,7 @@ import (
"github.com/influxdb/influxdb/udp"
)
func Run(config *Config, join, version string) (*messaging.Broker, *influxdb.Server) {
func Run(config *Config, join, version string) (*messaging.Broker, *influxdb.Server, *raft.Log) {
log.Printf("influxdb started, version %s, commit %s", version, commit)
var initBroker, initServer bool
@ -217,7 +217,7 @@ func Run(config *Config, join, version string) (*messaging.Broker, *influxdb.Ser
}
}
return b.Broker, s
return b.Broker, s, l
}
// write the current process id to a file specified by path.
@ -327,7 +327,7 @@ func openServer(config *Config, b *influxdb.Broker, initServer, initBroker bool,
}
// Create messaging client to the brokers.
c := influxdb.NewMessagingClient()
c := influxdb.NewMessagingClient(config.DataURL())
if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile)); err != nil {
log.Fatalf("messaging client error: %s", err)
}

View File

@ -19,6 +19,7 @@ import (
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/raft"
"github.com/influxdb/influxdb/client"
main "github.com/influxdb/influxdb/cmd/influxd"
@ -59,6 +60,7 @@ func rewriteDbRp(old, database, retention string) string {
type Node struct {
broker *messaging.Broker
server *influxdb.Server
log *raft.Log
url *url.URL
leader bool
}
@ -66,6 +68,26 @@ type Node struct {
// Cluster represents a multi-node cluster.
type Cluster []*Node
func (c *Cluster) Close() {
for _, n := range *c {
if n.log != nil {
n.log.Close()
n.log = nil
}
if n.server != nil {
n.server.Close()
n.server = nil
}
if n.broker != nil {
n.broker.Close()
n.broker = nil
}
}
}
// createCombinedNodeCluster creates a cluster of nServers nodes, each of which
// runs as both a Broker and Data node. If any part cluster creation fails,
// the testing is marked as failed.
@ -102,7 +124,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba
c.ReportingDisabled = true
c.Snapshot.Enabled = false
b, s := main.Run(c, "", "x.x")
b, s, l := main.Run(c, "", "x.x")
if b == nil {
t.Fatalf("Test %s: failed to create broker on port %d", testName, basePort)
}
@ -112,6 +134,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba
nodes = append(nodes, &Node{
broker: b,
server: s,
log: l,
url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(basePort)},
leader: true,
})
@ -124,7 +147,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba
c.Broker.Port = nextPort
c.Data.Port = nextPort
b, s := main.Run(c, "http://localhost:"+strconv.Itoa(basePort), "x.x")
b, s, l := main.Run(c, "http://localhost:"+strconv.Itoa(basePort), "x.x")
if b == nil {
t.Fatalf("Test %s: failed to create following broker on port %d", testName, basePort)
}
@ -135,6 +158,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba
nodes = append(nodes, &Node{
broker: b,
server: s,
log: l,
url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(nextPort)},
})
}
@ -1243,6 +1267,7 @@ func TestSingleServer(t *testing.T) {
}()
nodes := createCombinedNodeCluster(t, testName, dir, 1, 8090, nil)
defer nodes.Close()
runTestsData(t, testName, nodes, "mydb", "myrp")
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp")

View File

@ -139,7 +139,7 @@ func TestBatchWrite_UnmarshalRFC(t *testing.T) {
// Ensure that even if a measurement is not found, that the status code is still 200
func TestHandler_ShowMeasurementsNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -163,7 +163,7 @@ func TestHandler_ShowMeasurementsNotFound(t *testing.T) {
}
func TestHandler_CreateDatabase(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -178,7 +178,7 @@ func TestHandler_CreateDatabase(t *testing.T) {
}
func TestHandler_CreateDatabase_BadRequest_NoName(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -191,7 +191,7 @@ func TestHandler_CreateDatabase_BadRequest_NoName(t *testing.T) {
}
func TestHandler_CreateDatabase_Conflict(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -207,7 +207,7 @@ func TestHandler_CreateDatabase_Conflict(t *testing.T) {
}
func TestHandler_DropDatabase(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -223,7 +223,7 @@ func TestHandler_DropDatabase(t *testing.T) {
}
func TestHandler_DropDatabase_NotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -238,7 +238,7 @@ func TestHandler_DropDatabase_NotFound(t *testing.T) {
}
func TestHandler_RetentionPolicies(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -256,7 +256,7 @@ func TestHandler_RetentionPolicies(t *testing.T) {
}
func TestHandler_RetentionPolicies_DatabaseNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -272,7 +272,7 @@ func TestHandler_RetentionPolicies_DatabaseNotFound(t *testing.T) {
}
func TestHandler_CreateRetentionPolicy(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -290,7 +290,7 @@ func TestHandler_CreateRetentionPolicy(t *testing.T) {
}
func TestHandler_CreateRetentionPolicyAsDefault(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -315,7 +315,7 @@ func TestHandler_CreateRetentionPolicyAsDefault(t *testing.T) {
}
func TestHandler_CreateRetentionPolicy_DatabaseNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -330,7 +330,7 @@ func TestHandler_CreateRetentionPolicy_DatabaseNotFound(t *testing.T) {
}
func TestHandler_CreateRetentionPolicy_Conflict(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -348,7 +348,7 @@ func TestHandler_CreateRetentionPolicy_Conflict(t *testing.T) {
}
func TestHandler_CreateRetentionPolicy_BadRequest(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -364,7 +364,7 @@ func TestHandler_CreateRetentionPolicy_BadRequest(t *testing.T) {
}
func TestHandler_UpdateRetentionPolicy(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -396,7 +396,7 @@ func TestHandler_UpdateRetentionPolicy(t *testing.T) {
}
func TestHandler_UpdateRetentionPolicy_BadRequest(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -414,7 +414,7 @@ func TestHandler_UpdateRetentionPolicy_BadRequest(t *testing.T) {
}
func TestHandler_UpdateRetentionPolicy_DatabaseNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -430,7 +430,7 @@ func TestHandler_UpdateRetentionPolicy_DatabaseNotFound(t *testing.T) {
}
func TestHandler_UpdateRetentionPolicy_NotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -448,7 +448,7 @@ func TestHandler_UpdateRetentionPolicy_NotFound(t *testing.T) {
}
func TestHandler_DeleteRetentionPolicy(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -467,7 +467,7 @@ func TestHandler_DeleteRetentionPolicy(t *testing.T) {
}
func TestHandler_DeleteRetentionPolicy_DatabaseNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -484,7 +484,7 @@ func TestHandler_DeleteRetentionPolicy_DatabaseNotFound(t *testing.T) {
}
func TestHandler_DeleteRetentionPolicy_NotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -502,7 +502,7 @@ func TestHandler_DeleteRetentionPolicy_NotFound(t *testing.T) {
}
func TestHandler_GzipEnabled(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -528,7 +528,7 @@ func TestHandler_GzipEnabled(t *testing.T) {
}
func TestHandler_GzipDisabled(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -554,7 +554,7 @@ func TestHandler_GzipDisabled(t *testing.T) {
}
func TestHandler_Index(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -572,7 +572,7 @@ func TestHandler_Index(t *testing.T) {
}
func TestHandler_Wait(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -590,7 +590,7 @@ func TestHandler_Wait(t *testing.T) {
}
func TestHandler_WaitIncrement(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -610,7 +610,7 @@ func TestHandler_WaitIncrement(t *testing.T) {
}
func TestHandler_WaitNoIndexSpecified(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -624,7 +624,7 @@ func TestHandler_WaitNoIndexSpecified(t *testing.T) {
}
func TestHandler_WaitInvalidIndexSpecified(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -638,7 +638,7 @@ func TestHandler_WaitInvalidIndexSpecified(t *testing.T) {
}
func TestHandler_WaitExpectTimeout(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -652,7 +652,7 @@ func TestHandler_WaitExpectTimeout(t *testing.T) {
}
func TestHandler_Ping(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -666,7 +666,7 @@ func TestHandler_Ping(t *testing.T) {
}
func TestHandler_PingHead(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -680,7 +680,7 @@ func TestHandler_PingHead(t *testing.T) {
}
func TestHandler_Users_MultipleUsers(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateUser("jdoe", "1337", false)
@ -700,7 +700,7 @@ func TestHandler_Users_MultipleUsers(t *testing.T) {
func TestHandler_UpdateUser(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateUser("jdoe", "1337", false)
@ -723,7 +723,7 @@ func TestHandler_UpdateUser(t *testing.T) {
func TestHandler_UpdateUser_PasswordBadRequest(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateUser("jdoe", "1337", false)
@ -740,7 +740,7 @@ func TestHandler_UpdateUser_PasswordBadRequest(t *testing.T) {
func TestHandler_DataNodes(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenUninitializedServer(c)
srvr.CreateDataNode(MustParseURL("http://localhost:1000"))
@ -759,7 +759,7 @@ func TestHandler_DataNodes(t *testing.T) {
func TestHandler_CreateDataNode(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenUninitializedServer(c)
s := NewHTTPServer(srvr)
@ -775,7 +775,7 @@ func TestHandler_CreateDataNode(t *testing.T) {
func TestHandler_CreateDataNode_BadRequest(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -791,7 +791,7 @@ func TestHandler_CreateDataNode_BadRequest(t *testing.T) {
func TestHandler_CreateDataNode_InternalServerError(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -807,7 +807,7 @@ func TestHandler_CreateDataNode_InternalServerError(t *testing.T) {
func TestHandler_DeleteDataNode(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDataNode(MustParseURL("http://localhost:1000"))
@ -824,7 +824,7 @@ func TestHandler_DeleteDataNode(t *testing.T) {
func TestHandler_DeleteUser_DataNodeNotFound(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -841,7 +841,7 @@ func TestHandler_DeleteUser_DataNodeNotFound(t *testing.T) {
// Perform a subset of endpoint testing, with authentication enabled.
func TestHandler_AuthenticatedCreateAdminUser(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewAuthenticatedHTTPServer(srvr)
@ -864,7 +864,7 @@ func TestHandler_AuthenticatedCreateAdminUser(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_Unauthorized(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewAuthenticatedHTTPServer(srvr)
@ -877,7 +877,7 @@ func TestHandler_AuthenticatedDatabases_Unauthorized(t *testing.T) {
}
func TestHandler_QueryParamenterMissing(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -892,7 +892,7 @@ func TestHandler_QueryParamenterMissing(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_AuthorizedQueryParams(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateUser("lisa", "password", true)
@ -907,7 +907,7 @@ func TestHandler_AuthenticatedDatabases_AuthorizedQueryParams(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_UnauthorizedQueryParams(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateUser("lisa", "password", true)
@ -922,7 +922,7 @@ func TestHandler_AuthenticatedDatabases_UnauthorizedQueryParams(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_AuthorizedBasicAuth(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateUser("lisa", "password", true)
@ -939,7 +939,7 @@ func TestHandler_AuthenticatedDatabases_AuthorizedBasicAuth(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_UnauthorizedBasicAuth(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateUser("lisa", "password", true)
@ -956,7 +956,7 @@ func TestHandler_AuthenticatedDatabases_UnauthorizedBasicAuth(t *testing.T) {
}
func TestHandler_GrantDBPrivilege(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
// Create a cluster admin that will grant privilege to "john".
@ -995,7 +995,7 @@ func TestHandler_GrantDBPrivilege(t *testing.T) {
}
func TestHandler_RevokeAdmin(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
// Create a cluster admin that will revoke admin from "john".
@ -1029,7 +1029,7 @@ func TestHandler_RevokeAdmin(t *testing.T) {
}
func TestHandler_RevokeDBPrivilege(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
// Create a cluster admin that will revoke privilege from "john".
@ -1065,7 +1065,7 @@ func TestHandler_RevokeDBPrivilege(t *testing.T) {
}
func TestHandler_DropSeries(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1088,7 +1088,7 @@ func TestHandler_DropSeries(t *testing.T) {
}
func TestHandler_serveWriteSeries(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1111,7 +1111,7 @@ func TestHandler_serveWriteSeries(t *testing.T) {
}
func TestHandler_serveDump(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1142,7 +1142,7 @@ func TestHandler_serveDump(t *testing.T) {
}
func TestHandler_serveWriteSeriesWithNoFields(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateDatabase("foo")
@ -1162,7 +1162,7 @@ func TestHandler_serveWriteSeriesWithNoFields(t *testing.T) {
}
func TestHandler_serveWriteSeriesWithAuthNilUser(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateDatabase("foo")
@ -1183,7 +1183,7 @@ func TestHandler_serveWriteSeriesWithAuthNilUser(t *testing.T) {
}
func TestHandler_serveWriteSeries_noDatabaseExists(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewHTTPServer(srvr)
@ -1203,7 +1203,7 @@ func TestHandler_serveWriteSeries_noDatabaseExists(t *testing.T) {
}
func TestHandler_serveWriteSeries_errorHasJsonContentType(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -1230,7 +1230,7 @@ func TestHandler_serveWriteSeries_errorHasJsonContentType(t *testing.T) {
}
func TestHandler_serveWriteSeries_queryHasJsonContentType(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1290,7 +1290,7 @@ func TestHandler_serveWriteSeries_queryHasJsonContentType(t *testing.T) {
}
func TestHandler_serveWriteSeries_invalidJSON(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewHTTPServer(srvr)
@ -1309,7 +1309,7 @@ func TestHandler_serveWriteSeries_invalidJSON(t *testing.T) {
}
func TestHandler_serveWriteSeries_noDatabaseSpecified(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewHTTPServer(srvr)
@ -1328,7 +1328,7 @@ func TestHandler_serveWriteSeries_noDatabaseSpecified(t *testing.T) {
}
func TestHandler_serveWriteSeriesNonZeroTime(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1371,7 +1371,7 @@ func TestHandler_serveWriteSeriesNonZeroTime(t *testing.T) {
}
func TestHandler_serveWriteSeriesZeroTime(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1426,7 +1426,7 @@ func TestHandler_serveWriteSeriesZeroTime(t *testing.T) {
}
func TestHandler_serveWriteSeriesBatch(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1511,7 +1511,7 @@ func TestHandler_serveWriteSeriesBatch(t *testing.T) {
}
func TestHandler_serveWriteSeriesFieldTypeConflict(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1554,7 +1554,7 @@ func str2iface(strs []string) []interface{} {
}
func TestHandler_ProcessContinousQueries(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewAuthenticatedHTTPServer(srvr)

View File

@ -472,37 +472,51 @@ func (b *Broker) TopicReader(topicID, index uint64, streaming bool) io.ReadClose
return NewTopicReader(b.TopicPath(topicID), index, streaming)
}
// SetTopicMaxIndex updates the highest replicated index for a topic.
// SetTopicMaxIndex updates the highest replicated index for a topic and data URL.
// If a higher index is already set on the topic then the call is ignored.
// This index is only held in memory and is used for topic segment reclamation.
func (b *Broker) SetTopicMaxIndex(topicID, index uint64) error {
// The higheset replciated index per data URL is tracked separately from the current index
func (b *Broker) SetTopicMaxIndex(topicID, index uint64, u url.URL) error {
_, err := b.Publish(&Message{
Type: SetTopicMaxIndexMessageType,
Data: marshalTopicIndex(topicID, index),
Data: marshalTopicIndex(topicID, index, u),
})
return err
}
func (b *Broker) applySetTopicMaxIndex(m *Message) {
topicID, index := unmarshalTopicIndex(m.Data)
topicID, index, u := unmarshalTopicIndex(m.Data)
// Set index if it's not already set higher.
t := b.topics[topicID]
if t != nil && t.index < index {
t.index = index
if t := b.topics[topicID]; t != nil {
// Track the highest replicated index per data node URL
t.indexByURL[u] = index
if t.index < index {
t.index = index
}
}
}
func marshalTopicIndex(topicID, index uint64) []byte {
b := make([]byte, 16)
func marshalTopicIndex(topicID, index uint64, u url.URL) []byte {
s := []byte(u.String())
b := make([]byte, 16+2+len(s))
binary.BigEndian.PutUint64(b[0:8], topicID)
binary.BigEndian.PutUint64(b[8:16], index)
binary.BigEndian.PutUint16(b[16:18], uint16(len(s))) // URL string length
n := copy(b[18:], s) // URL string
assert(n == len(s), "marshal topic index too short. have %d, expectd %d", n, len(s))
return b
}
func unmarshalTopicIndex(b []byte) (topicID, index uint64) {
func unmarshalTopicIndex(b []byte) (topicID, index uint64, u url.URL) {
assert(len(b) >= 18, "unmarshal topic index too short. have %d, expected %d", len(b), 20)
topicID = binary.BigEndian.Uint64(b[0:8])
index = binary.BigEndian.Uint64(b[8:16])
n := binary.BigEndian.Uint16(b[16:18]) // URL length
du, err := url.Parse(string(b[18 : 18+n])) // URL
assert(err == nil, "unmarshal binary error: %s", err)
u = *du
return
}
@ -619,9 +633,13 @@ const DefaultMaxSegmentSize = 10 * 1024 * 1024 // 10MB
type Topic struct {
mu sync.Mutex
id uint64 // unique identifier
index uint64 // highest index replicated
index uint64 // current index
path string // on-disk path
// highest index replicated per data url. The unique set of keys across all topics
// provides a snapshot of the addresses of every data node in a cluster.
indexByURL map[url.URL]uint64
file *os.File // last segment writer
opened bool
@ -632,9 +650,9 @@ type Topic struct {
// NewTopic returns a new instance of Topic.
func NewTopic(id uint64, path string) *Topic {
return &Topic{
id: id,
path: path,
id: id,
path: path,
indexByURL: make(map[url.URL]uint64),
MaxSegmentSize: DefaultMaxSegmentSize,
}
}
@ -652,6 +670,13 @@ func (t *Topic) Index() uint64 {
return t.index
}
// IndexForURL returns the highest index replicated for a given data URL.
func (t *Topic) IndexForURL(u url.URL) uint64 {
t.mu.Lock()
defer t.mu.Unlock()
return t.indexByURL[u]
}
// SegmentPath returns the path to a segment starting with a given log index.
func (t *Topic) SegmentPath(index uint64) string {
t.mu.Lock()

View File

@ -120,17 +120,26 @@ func TestBroker_Apply_SetMaxTopicIndex(t *testing.T) {
t.Fatal("topic not created")
}
testDataURL, _ := url.Parse("http://localhost:1234/data")
data := []byte{0, 0, 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 5} // topicID=20, index=5,
data = append(data, []byte{0, byte(len(testDataURL.String()))}...) // len= <url length>
data = append(data, []byte(testDataURL.String())...)
// Set topic #1's index to "2".
if err := b.Apply(&messaging.Message{
Index: 2,
Type: messaging.SetTopicMaxIndexMessageType,
Data: []byte{0, 0, 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 5}, // topicID=20, index=5
Data: data,
}); err != nil {
t.Fatalf("apply error: %s", err)
}
if topic := b.Topic(20); topic.Index() != 5 {
t.Fatalf("unexpected topic index: %d", topic.Index())
}
if topic := b.Topic(20); topic.IndexForURL(*testDataURL) != 5 {
t.Fatalf("unexpected topic url index: %d", topic.IndexForURL(*testDataURL))
}
}
// Ensure the broker can read from topics after reopening.
@ -232,6 +241,9 @@ func TestBroker_SetTopicMaxIndex(t *testing.T) {
b := OpenBroker()
defer b.Close()
testDataURL, _ := url.Parse("http://localhost:1234/data")
urlData := []byte{0, byte(len(testDataURL.String()))} // len=26
urlData = append(urlData, []byte(testDataURL.String())...)
// Ensure the appropriate message is sent to the log.
b.Log().ApplyFunc = func(data []byte) (uint64, error) {
m, _ := messaging.UnmarshalMessage(data)
@ -239,12 +251,14 @@ func TestBroker_SetTopicMaxIndex(t *testing.T) {
t.Fatalf("unexpected topic id data: %x", data[0:8])
} else if !bytes.Equal(m.Data[8:16], []byte{0, 0, 0, 0, 0, 0, 0, 2}) {
t.Fatalf("unexpected index data: %x", data[8:16])
} else if !bytes.Equal(m.Data[16:44], urlData) {
t.Fatalf("unexpected url data: %v", m.Data[16:44])
}
return 1, nil
}
// Set the highest replicated topic index.
if err := b.SetTopicMaxIndex(1, 2); err != nil {
if err := b.SetTopicMaxIndex(1, 2, *testDataURL); err != nil {
t.Fatal(err)
}
}

View File

@ -20,19 +20,24 @@ import (
const (
// DefaultReconnectTimeout is the default time to wait between when a broker
// stream disconnects and another connection is retried.
DefaultReconnectTimeout = 1000 * time.Millisecond
DefaultReconnectTimeout = 1 * time.Second
// DefaultPingInterval is the default time to wait between checks to the broker.
DefaultPingInterval = 1000 * time.Millisecond
DefaultPingInterval = 1 * time.Second
// DefaultHeartbeatInterval is the default time that a topic subscriber heartbeats
// with a broker
DefaultHeartbeatInterval = 1 * time.Second
)
// Client represents a client for the broker's HTTP API.
type Client struct {
mu sync.Mutex
path string // config file path
conns []*Conn // all connections opened by client
url url.URL // current known leader URL
urls []url.URL // list of available broker URLs
mu sync.Mutex
path string // config file path
conns []*Conn // all connections opened by client
url url.URL // current known leader URL
urls []url.URL // list of available broker URLs
dataURL url.URL // URL of the client's data node
opened bool
@ -50,10 +55,11 @@ type Client struct {
}
// NewClient returns a new instance of Client with defaults set.
func NewClient() *Client {
func NewClient(dataURL url.URL) *Client {
c := &Client{
ReconnectTimeout: DefaultReconnectTimeout,
PingInterval: DefaultPingInterval,
dataURL: dataURL,
}
return c
}
@ -340,7 +346,7 @@ func (c *Client) Conn(topicID uint64) *Conn {
defer c.mu.Unlock()
// Create connection and set current URL.
conn := NewConn(topicID)
conn := NewConn(topicID, &c.dataURL)
conn.SetURL(c.url)
// Add to list of client connections.
@ -353,11 +359,13 @@ func (c *Client) Conn(topicID uint64) *Conn {
func (c *Client) pinger(closing chan struct{}) {
defer c.wg.Done()
t := time.NewTicker(c.PingInterval)
defer t.Stop()
for {
select {
case <-closing:
return
case <-time.After(c.PingInterval):
case <-t.C:
c.Ping()
}
}
@ -407,6 +415,7 @@ type Conn struct {
index uint64 // highest index sent over the channel
streaming bool // use streaming reader, if true
url url.URL // current broker url
dataURL url.URL // url for the data node or this caller
opened bool
c chan *Message // channel streams messages from the broker.
@ -417,16 +426,21 @@ type Conn struct {
// The amount of time to wait before reconnecting to a broker stream.
ReconnectTimeout time.Duration
// The amount of time between heartbeats from data nodes to brokers
HeartbeatInterval time.Duration
// The logging interface used by the connection for out-of-band errors.
Logger *log.Logger
}
// NewConn returns a new connection to the broker for a topic.
func NewConn(topicID uint64) *Conn {
func NewConn(topicID uint64, dataURL *url.URL) *Conn {
return &Conn{
topicID: topicID,
ReconnectTimeout: DefaultReconnectTimeout,
Logger: log.New(os.Stderr, "[messaging] ", log.LstdFlags),
topicID: topicID,
dataURL: *dataURL,
ReconnectTimeout: DefaultReconnectTimeout,
HeartbeatInterval: DefaultHeartbeatInterval,
Logger: log.New(os.Stderr, "[messaging] ", log.LstdFlags),
}
}
@ -492,10 +506,10 @@ func (c *Conn) Open(index uint64, streaming bool) error {
c.c = make(chan *Message, 0)
// Start goroutines.
c.wg.Add(1)
c.wg.Add(2)
c.closing = make(chan struct{})
go c.streamer(c.closing)
go c.heartbeater(c.closing)
return nil
}
@ -531,6 +545,22 @@ func (c *Conn) close() error {
return nil
}
// heartbeater periodically heartbeats the broker its index
func (c *Conn) heartbeater(closing chan struct{}) {
defer c.wg.Done()
t := time.NewTicker(c.HeartbeatInterval)
defer t.Stop()
for {
select {
case <-closing:
return
case <-t.C:
c.Heartbeat()
}
}
}
// Heartbeat sends a heartbeat back to the broker with the client's index.
func (c *Conn) Heartbeat() error {
var resp *http.Response
@ -546,6 +576,7 @@ func (c *Conn) Heartbeat() error {
u.RawQuery = url.Values{
"topicID": {strconv.FormatUint(topicID, 10)},
"index": {strconv.FormatUint(index, 10)},
"url": {c.dataURL.String()},
}.Encode()
resp, err = http.Post(u.String(), "application/octet-stream", nil)
if err != nil {

View File

@ -14,6 +14,12 @@ import (
"github.com/influxdb/influxdb/messaging"
)
var testDataURL *url.URL
func init() {
testDataURL, _ = url.Parse("http://localhost:1234/data")
}
// Ensure a client can open the configuration file, if it exists.
func TestClient_Open_WithConfig(t *testing.T) {
// Write configuration file.
@ -477,7 +483,7 @@ func TestClient_Conn(t *testing.T) {
// Ensure that an error is returned when opening an opened connection.
func TestConn_Open_ErrConnOpen(t *testing.T) {
c := messaging.NewConn(1)
c := messaging.NewConn(1, testDataURL)
c.Open(0, false)
defer c.Close()
if err := c.Open(0, false); err != messaging.ErrConnOpen {
@ -487,7 +493,7 @@ func TestConn_Open_ErrConnOpen(t *testing.T) {
// Ensure that an error is returned when opening a previously closed connection.
func TestConn_Open_ErrConnCannotReuse(t *testing.T) {
c := messaging.NewConn(1)
c := messaging.NewConn(1, testDataURL)
c.Open(0, false)
c.Close()
if err := c.Open(0, false); err != messaging.ErrConnCannotReuse {
@ -497,7 +503,7 @@ func TestConn_Open_ErrConnCannotReuse(t *testing.T) {
// Ensure that an error is returned when closing a closed connection.
func TestConn_Close_ErrConnClosed(t *testing.T) {
c := messaging.NewConn(1)
c := messaging.NewConn(1, testDataURL)
c.Open(0, false)
c.Close()
if err := c.Close(); err != messaging.ErrConnClosed {
@ -524,7 +530,7 @@ func TestConn_Open(t *testing.T) {
defer s.Close()
// Create and open connection to server.
c := messaging.NewConn(100)
c := messaging.NewConn(100, testDataURL)
c.SetURL(*MustParseURL(s.URL))
if err := c.Open(200, false); err != nil {
t.Fatal(err)
@ -561,7 +567,7 @@ func TestConn_Open_Reconnect(t *testing.T) {
defer s.Close()
// Create and open connection to server.
c := messaging.NewConn(100)
c := messaging.NewConn(100, testDataURL)
c.SetURL(*MustParseURL(s.URL))
if err := c.Open(0, false); err != nil {
t.Fatal(err)
@ -590,12 +596,14 @@ func TestConn_Heartbeat(t *testing.T) {
t.Fatalf("unexpected topic id: %s", topicID)
} else if index := req.URL.Query().Get("index"); index != "200" {
t.Fatalf("unexpected index: %s", index)
} else if url := req.URL.Query().Get("url"); url != "http://localhost:1234/data" {
t.Fatalf("unexpected url: %s, got %s", "http://localhost:1234/data", url)
}
}))
defer s.Close()
// Create connection and heartbeat.
c := messaging.NewConn(100)
c := messaging.NewConn(100, testDataURL)
c.SetURL(*MustParseURL(s.URL))
c.SetIndex(200)
if err := c.Heartbeat(); err != nil {
@ -609,7 +617,7 @@ func TestConn_Heartbeat_ErrConnectionRefused(t *testing.T) {
s.Close()
// Create connection and heartbeat.
c := messaging.NewConn(0)
c := messaging.NewConn(0, testDataURL)
c.SetURL(*MustParseURL(s.URL))
if err := c.Heartbeat(); err == nil || !strings.Contains(err.Error(), `connection refused`) {
t.Fatalf("unexpected error: %s", err)
@ -625,7 +633,7 @@ func TestConn_Heartbeat_ErrNoLeader(t *testing.T) {
defer s.Close()
// Create connection and heartbeat.
c := messaging.NewConn(0)
c := messaging.NewConn(0, testDataURL)
c.SetURL(*MustParseURL(s.URL))
if err := c.Heartbeat(); err != messaging.ErrNoLeader {
t.Fatalf("unexpected error: %s", err)
@ -641,7 +649,7 @@ func TestConn_Heartbeat_ErrBrokerError(t *testing.T) {
defer s.Close()
// Create connection and heartbeat.
c := messaging.NewConn(0)
c := messaging.NewConn(0, testDataURL)
c.SetURL(*MustParseURL(s.URL))
if err := c.Heartbeat(); err == nil || err.Error() != `oh no` {
t.Fatalf("unexpected error: %s", err)
@ -656,7 +664,7 @@ func TestConn_Heartbeat_ErrHTTPError(t *testing.T) {
defer s.Close()
// Create connection and heartbeat.
c := messaging.NewConn(0)
c := messaging.NewConn(0, testDataURL)
c.SetURL(*MustParseURL(s.URL))
if err := c.Heartbeat(); err == nil || err.Error() != `heartbeat error: status=500` {
t.Fatalf("unexpected error: %s", err)
@ -711,7 +719,7 @@ type Client struct {
// NewClient returns an new instance of Client.
func NewClient() *Client {
return &Client{messaging.NewClient()}
return &Client{messaging.NewClient(*testDataURL)}
}
// MustOpen opens the client. Panic on error.

View File

@ -70,6 +70,9 @@ var (
// ErrReaderClosed is returned when reading from a closed topic reader.
ErrReaderClosed = errors.New("reader closed")
// ErrURLRequired is returned when making a call without a url parameter
ErrURLRequired = errors.New("url required")
// ErrMessageDataRequired is returned when publishing a message without data.
ErrMessageDataRequired = errors.New("message data required")
)

View File

@ -21,7 +21,7 @@ type Handler struct {
LeaderURL() url.URL
TopicReader(topicID, index uint64, streaming bool) io.ReadCloser
Publish(m *Message) (uint64, error)
SetTopicMaxIndex(topicID, index uint64) error
SetTopicMaxIndex(topicID, index uint64, u url.URL) error
}
RaftHandler http.Handler
@ -160,8 +160,13 @@ func (h *Handler) postHeartbeat(w http.ResponseWriter, r *http.Request) {
return
}
u, err := url.Parse(r.URL.Query().Get("url"))
if err != nil {
h.error(w, ErrURLRequired, http.StatusBadRequest)
}
// Update the topic's highest replicated index.
if err := h.Broker.SetTopicMaxIndex(topicID, index); err == raft.ErrNotLeader {
if err := h.Broker.SetTopicMaxIndex(topicID, index, *u); err == raft.ErrNotLeader {
h.redirectToLeader(w, r)
return
} else if err != nil {

View File

@ -161,7 +161,7 @@ func TestHandler_messages_ErrMethodNotAllowed(t *testing.T) {
// Ensure a handler can receive a heartbeats.
func TestHandler_postHeartbeat(t *testing.T) {
var hb HandlerBroker
hb.SetTopicMaxIndexFunc = func(topicID, index uint64) error {
hb.SetTopicMaxIndexFunc = func(topicID, index uint64, dataURL url.URL) error {
if topicID != 1 {
t.Fatalf("unexpected topic id: %d", topicID)
} else if index != 2 {
@ -274,7 +274,7 @@ type HandlerBroker struct {
LeaderURLFunc func() url.URL
PublishFunc func(m *messaging.Message) (uint64, error)
TopicReaderFunc func(topicID, index uint64, streaming bool) io.ReadCloser
SetTopicMaxIndexFunc func(topicID, index uint64) error
SetTopicMaxIndexFunc func(topicID, index uint64, dataURL url.URL) error
}
func (b *HandlerBroker) URLs() []url.URL { return b.URLsFunc() }
@ -284,8 +284,8 @@ func (b *HandlerBroker) Publish(m *messaging.Message) (uint64, error) { return b
func (b *HandlerBroker) TopicReader(topicID, index uint64, streaming bool) io.ReadCloser {
return b.TopicReaderFunc(topicID, index, streaming)
}
func (b *HandlerBroker) SetTopicMaxIndex(topicID, index uint64) error {
return b.SetTopicMaxIndexFunc(topicID, index)
func (b *HandlerBroker) SetTopicMaxIndex(topicID, index uint64, dataURL url.URL) error {
return b.SetTopicMaxIndexFunc(topicID, index, dataURL)
}
// MustParseURL parses a string into a URL. Panic on error.

View File

@ -3368,8 +3368,8 @@ type messagingClient struct {
}
// NewMessagingClient returns an instance of MessagingClient.
func NewMessagingClient() MessagingClient {
return &messagingClient{messaging.NewClient()}
func NewMessagingClient(dataURL url.URL) MessagingClient {
return &messagingClient{messaging.NewClient(dataURL)}
}
func (c *messagingClient) Conn(topicID uint64) MessagingConn { return c.Client.Conn(topicID) }
@ -3849,3 +3849,12 @@ func (s *Server) CreateSnapshotWriter() (*SnapshotWriter, error) {
defer s.mu.RUnlock()
return createServerSnapshotWriter(s)
}
func (s *Server) URL() *url.URL {
s.mu.RLock()
defer s.mu.RUnlock()
if n := s.dataNodes[s.id]; n != nil {
return n.URL
}
return &url.URL{}
}

View File

@ -21,7 +21,7 @@ import (
// Ensure the server can be successfully opened and closed.
func TestServer_Open(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := NewServer()
defer s.Close()
@ -41,7 +41,7 @@ func TestServer_Open_ErrPathRequired(t *testing.T) { t.Skip("pending") }
// Ensure the server can create a new data node.
func TestServer_CreateDataNode(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -65,7 +65,7 @@ func TestServer_CreateDataNode(t *testing.T) {
// Ensure the server returns an error when creating a duplicate node.
func TestServer_CreateDatabase_ErrDataNodeExists(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -82,7 +82,7 @@ func TestServer_CreateDatabase_ErrDataNodeExists(t *testing.T) {
// Ensure the server can delete a node.
func TestServer_DeleteDataNode(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -107,7 +107,7 @@ func TestServer_DeleteDataNode(t *testing.T) {
// Test unuathorized requests logging
func TestServer_UnauthorizedRequests(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -137,7 +137,7 @@ func TestServer_UnauthorizedRequests(t *testing.T) {
// Test user privilege authorization.
func TestServer_UserPrivilegeAuthorization(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -174,7 +174,7 @@ func TestServer_UserPrivilegeAuthorization(t *testing.T) {
// Test single statement query authorization.
func TestServer_SingleStatementQueryAuthorization(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -243,7 +243,7 @@ func TestServer_SingleStatementQueryAuthorization(t *testing.T) {
// Test multiple statement query authorization.
func TestServer_MultiStatementQueryAuthorization(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -290,7 +290,7 @@ func TestServer_MultiStatementQueryAuthorization(t *testing.T) {
// Ensure the server can create a database.
func TestServer_CreateDatabase(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -314,7 +314,7 @@ func TestServer_CreateDatabase(t *testing.T) {
// Ensure the server returns an error when creating a duplicate database.
func TestServer_CreateDatabase_ErrDatabaseExists(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -330,7 +330,7 @@ func TestServer_CreateDatabase_ErrDatabaseExists(t *testing.T) {
// Ensure the server can drop a database.
func TestServer_DropDatabase(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -358,7 +358,7 @@ func TestServer_DropDatabase(t *testing.T) {
// Ensure the server returns an error when dropping a database that doesn't exist.
func TestServer_DropDatabase_ErrDatabaseNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -371,7 +371,7 @@ func TestServer_DropDatabase_ErrDatabaseNotFound(t *testing.T) {
// Ensure the server can return a list of all databases.
func TestServer_Databases(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -393,7 +393,7 @@ func TestServer_Databases(t *testing.T) {
// Ensure the server can create a new user.
func TestServer_CreateUser(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -431,7 +431,7 @@ func TestServer_CreateUser(t *testing.T) {
// Ensure the server correctly detects when there is an admin user.
func TestServer_AdminUserExists(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -462,7 +462,7 @@ func TestServer_AdminUserExists(t *testing.T) {
// Ensure the server returns an error when creating an user without a name.
func TestServer_CreateUser_ErrUsernameRequired(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -473,7 +473,7 @@ func TestServer_CreateUser_ErrUsernameRequired(t *testing.T) {
// Ensure the server returns an error when creating a duplicate user.
func TestServer_CreateUser_ErrUserExists(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -487,7 +487,7 @@ func TestServer_CreateUser_ErrUserExists(t *testing.T) {
// Ensure the server can delete an existing user.
func TestServer_DeleteUser(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -514,7 +514,7 @@ func TestServer_DeleteUser(t *testing.T) {
// Ensure the server can return a list of all users.
func TestServer_Users(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -536,7 +536,7 @@ func TestServer_Users(t *testing.T) {
// Ensure the server does not return non-existent users
func TestServer_NonExistingUsers(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -559,7 +559,7 @@ func TestServer_NonExistingUsers(t *testing.T) {
// Ensure the database can create a new retention policy.
func TestServer_CreateRetentionPolicy(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -593,7 +593,7 @@ func TestServer_CreateRetentionPolicy(t *testing.T) {
// Ensure the database can create a new retention policy with infinite duration.
func TestServer_CreateRetentionPolicyInfinite(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -628,7 +628,7 @@ func TestServer_CreateRetentionPolicyInfinite(t *testing.T) {
// Ensure the database can creates a default retention policy.
func TestServer_CreateRetentionPolicyDefault(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -662,7 +662,7 @@ func TestServer_CreateRetentionPolicyDefault(t *testing.T) {
// Ensure the server returns an error when creating a retention policy with an invalid db.
func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -673,7 +673,7 @@ func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
// Ensure the server returns an error when creating a retention policy without a name.
func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -685,7 +685,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.
// Ensure the server returns an error when creating a duplicate retention policy.
func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -698,7 +698,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) {
// Ensure the server returns an error when creating a retention policy with a duration less than one hour.
func TestServer_CreateRetentionPolicy_ErrRetentionPolicyMinDuration(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -710,7 +710,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyMinDuration(t *testing.T
// Ensure the database can alter an existing retention policy.
func TestServer_AlterRetentionPolicy(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -795,7 +795,7 @@ func TestServer_AlterRetentionPolicy(t *testing.T) {
// Ensure the server an error is returned if trying to alter a retention policy with a duration too small.
func TestServer_AlterRetentionPolicy_Minduration(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -850,7 +850,7 @@ func TestServer_AlterRetentionPolicy_Minduration(t *testing.T) {
// Ensure the server can delete an existing retention policy.
func TestServer_DeleteRetentionPolicy(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -878,7 +878,7 @@ func TestServer_DeleteRetentionPolicy(t *testing.T) {
// Ensure the server returns an error when deleting a retention policy on invalid db.
func TestServer_DeleteRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -889,7 +889,7 @@ func TestServer_DeleteRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
// Ensure the server returns an error when deleting a retention policy without a name.
func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -901,7 +901,7 @@ func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.
// Ensure the server returns an error when deleting a non-existent retention policy.
func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -913,7 +913,7 @@ func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) {
// Ensure the server can set the default retention policy
func TestServer_SetDefaultRetentionPolicy(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -948,7 +948,7 @@ func TestServer_SetDefaultRetentionPolicy(t *testing.T) {
// Ensure the server returns an error when setting the default retention policy to a non-existant one.
func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -960,7 +960,7 @@ func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.
// Ensure the server pre-creates shard groups as needed.
func TestServer_PreCreateRetentionPolices(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -997,7 +997,7 @@ func TestServer_PreCreateRetentionPolices(t *testing.T) {
// Ensure the server prohibits a zero check interval for retention policy enforcement.
func TestServer_StartRetentionPolicyEnforcement_ErrZeroInterval(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1007,7 +1007,7 @@ func TestServer_StartRetentionPolicyEnforcement_ErrZeroInterval(t *testing.T) {
}
func TestServer_EnforceRetentionPolices(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1044,7 +1044,7 @@ func TestServer_EnforceRetentionPolices(t *testing.T) {
// Ensure the server can drop a measurement.
func TestServer_DropMeasurement(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1107,7 +1107,7 @@ func TestServer_DropMeasurement(t *testing.T) {
// Ensure the server can handles drop measurement if none exists.
func TestServer_DropMeasurementNoneExists(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1138,7 +1138,7 @@ func TestServer_DropMeasurementNoneExists(t *testing.T) {
// Ensure the server can drop a series.
func TestServer_DropSeries(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1182,7 +1182,7 @@ func TestServer_DropSeries(t *testing.T) {
// Ensure the server can drop a series from measurement when more than one shard exists.
func TestServer_DropSeriesFromMeasurement(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1228,7 +1228,7 @@ func TestServer_DropSeriesFromMeasurement(t *testing.T) {
// ensure that the dropped series is gone
// ensure that we can still query: select value from cpu where region=uswest
func TestServer_DropSeriesTagsPreserved(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1302,7 +1302,7 @@ func TestServer_DropSeriesTagsPreserved(t *testing.T) {
// Ensure the server respects limit and offset in show series queries
func TestServer_ShowSeriesLimitOffset(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1365,7 +1365,7 @@ func TestServer_ShowSeriesLimitOffset(t *testing.T) {
}
func TestServer_CreateShardGroupIfNotExist(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1390,7 +1390,7 @@ func TestServer_CreateShardGroupIfNotExist(t *testing.T) {
}
func TestServer_DeleteShardGroup(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1428,7 +1428,7 @@ func TestServer_DeleteShardGroup(t *testing.T) {
/* TODO(benbjohnson): Change test to not expose underlying series ids directly.
func TestServer_Measurements(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1499,7 +1499,7 @@ func TestServer_NormalizeMeasurement(t *testing.T) {
}
// Create server with a variety of databases, retention policies, and measurements
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1550,7 +1550,7 @@ func TestServer_NormalizeQuery(t *testing.T) {
}
// Start server with database & retention policy.
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1572,7 +1572,7 @@ func TestServer_NormalizeQuery(t *testing.T) {
// Ensure the server can create a continuous query
func TestServer_CreateContinuousQuery(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1632,7 +1632,7 @@ func TestServer_CreateContinuousQuery_ErrInfinteLoop(t *testing.T) {
}
func TestServer_DropContinuousQuery(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1691,7 +1691,7 @@ func TestServer_DropContinuousQuery(t *testing.T) {
// Ensure
func TestServer_RunContinuousQueries(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1771,7 +1771,7 @@ func TestServer_RunContinuousQueries(t *testing.T) {
// Ensure the server can create a snapshot writer.
func TestServer_CreateSnapshotWriter(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
s := OpenServer(c)
defer s.Close()

View File

@ -9,11 +9,17 @@ import (
"github.com/influxdb/influxdb/messaging"
)
func init() {
// Ensure the broker matches the handler's interface.
_ = messaging.Handler{Broker: messaging.NewBroker()}
}
// MessagingClient represents a test client for the messaging broker.
type MessagingClient struct {
mu sync.Mutex
index uint64 // highest index
conns []*MessagingConn // list of all connections
mu sync.Mutex
index uint64 // highest index
conns []*MessagingConn // list of all connections
dataURL url.URL // clients data node URL
messagesByTopicID map[uint64][]*messaging.Message // message by topic
@ -22,15 +28,22 @@ type MessagingClient struct {
}
// NewMessagingClient returns a new instance of MessagingClient.
func NewMessagingClient() *MessagingClient {
func NewMessagingClient(dataURL url.URL) *MessagingClient {
c := &MessagingClient{
messagesByTopicID: make(map[uint64][]*messaging.Message),
dataURL: dataURL,
}
c.PublishFunc = c.DefaultPublishFunc
c.ConnFunc = c.DefaultConnFunc
return c
}
// NewMessagingClient returns a new instance of MessagingClient.
func NewDefaultMessagingClient() *MessagingClient {
testDataURL, _ := url.Parse("http://localhost:1234/data")
return NewMessagingClient(*testDataURL)
}
func (c *MessagingClient) Open(path string) error { return nil }
// Close closes all open connections.
@ -82,7 +95,7 @@ func (c *MessagingClient) DefaultConnFunc(topicID uint64) influxdb.MessagingConn
defer c.mu.Unlock()
// Create new connection.
conn := NewMessagingConn(topicID)
conn := NewMessagingConn(topicID, c.dataURL)
// Track connections.
c.conns = append(c.conns, conn)
@ -111,13 +124,15 @@ type MessagingConn struct {
mu sync.Mutex
topicID uint64
index uint64
dataURL url.URL
c chan *messaging.Message
}
// NewMessagingConn returns a new instance of MessagingConn.
func NewMessagingConn(topicID uint64) *MessagingConn {
func NewMessagingConn(topicID uint64, dataURL url.URL) *MessagingConn {
return &MessagingConn{
topicID: topicID,
dataURL: dataURL,
}
}