Track data node urls on brokers

This sends data node urls via the broker heartbeat from each data
node.  The urls are tracked on the broker to support simpler
cluster setup as well as distributed queries.
pull/2128/head
Jason Wilder 2015-03-30 16:41:56 -06:00
parent 0af6d1006d
commit 91fb7e3756
14 changed files with 320 additions and 179 deletions

View File

@ -55,7 +55,7 @@ func TestRestoreCommand(t *testing.T) {
}
// Start server.
b, s := main.Run(c, "", "x.x")
b, s, l := main.Run(c, "", "x.x")
if b == nil {
t.Fatal("cannot run broker")
} else if s == nil {
@ -88,6 +88,7 @@ func TestRestoreCommand(t *testing.T) {
f.Close()
// Stop server.
l.Close()
s.Close()
b.Close()
@ -109,7 +110,7 @@ func TestRestoreCommand(t *testing.T) {
}
// Restart server.
b, s = main.Run(c, "", "x.x")
b, s, l = main.Run(c, "", "x.x")
if b == nil {
t.Fatal("cannot run broker")
} else if s == nil {
@ -139,6 +140,11 @@ func TestRestoreCommand(t *testing.T) {
} else if !reflect.DeepEqual(v, map[string]interface{}{"value": float64(1000)}) {
t.Fatalf("read series(1) mismatch: %#v", v)
}
// Stop server.
l.Close()
s.Close()
b.Close()
}
// RestoreCommand is a test wrapper for main.RestoreCommand.

View File

@ -23,7 +23,7 @@ import (
"github.com/influxdb/influxdb/udp"
)
func Run(config *Config, join, version string) (*messaging.Broker, *influxdb.Server) {
func Run(config *Config, join, version string) (*messaging.Broker, *influxdb.Server, *raft.Log) {
log.Printf("influxdb started, version %s, commit %s", version, commit)
var initBroker, initServer bool
@ -217,7 +217,7 @@ func Run(config *Config, join, version string) (*messaging.Broker, *influxdb.Ser
}
}
return b.Broker, s
return b.Broker, s, l
}
// write the current process id to a file specified by path.
@ -327,7 +327,7 @@ func openServer(config *Config, b *influxdb.Broker, initServer, initBroker bool,
}
// Create messaging client to the brokers.
c := influxdb.NewMessagingClient()
c := influxdb.NewMessagingClient(config.DataURL())
if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile)); err != nil {
log.Fatalf("messaging client error: %s", err)
}

View File

@ -19,6 +19,7 @@ import (
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/raft"
"github.com/influxdb/influxdb/client"
main "github.com/influxdb/influxdb/cmd/influxd"
@ -59,6 +60,7 @@ func rewriteDbRp(old, database, retention string) string {
type Node struct {
broker *messaging.Broker
server *influxdb.Server
log *raft.Log
url *url.URL
leader bool
}
@ -66,6 +68,26 @@ type Node struct {
// Cluster represents a multi-node cluster.
type Cluster []*Node
func (c *Cluster) Close() {
for _, n := range *c {
if n.log != nil {
n.log.Close()
n.log = nil
}
if n.server != nil {
n.server.Close()
n.server = nil
}
if n.broker != nil {
n.broker.Close()
n.broker = nil
}
}
}
// createCombinedNodeCluster creates a cluster of nServers nodes, each of which
// runs as both a Broker and Data node. If any part cluster creation fails,
// the testing is marked as failed.
@ -102,7 +124,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba
c.ReportingDisabled = true
c.Snapshot.Enabled = false
b, s := main.Run(c, "", "x.x")
b, s, l := main.Run(c, "", "x.x")
if b == nil {
t.Fatalf("Test %s: failed to create broker on port %d", testName, basePort)
}
@ -112,6 +134,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba
nodes = append(nodes, &Node{
broker: b,
server: s,
log: l,
url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(basePort)},
leader: true,
})
@ -124,7 +147,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba
c.Broker.Port = nextPort
c.Data.Port = nextPort
b, s := main.Run(c, "http://localhost:"+strconv.Itoa(basePort), "x.x")
b, s, l := main.Run(c, "http://localhost:"+strconv.Itoa(basePort), "x.x")
if b == nil {
t.Fatalf("Test %s: failed to create following broker on port %d", testName, basePort)
}
@ -135,6 +158,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba
nodes = append(nodes, &Node{
broker: b,
server: s,
log: l,
url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(nextPort)},
})
}
@ -1243,6 +1267,7 @@ func TestSingleServer(t *testing.T) {
}()
nodes := createCombinedNodeCluster(t, testName, dir, 1, 8090, nil)
defer nodes.Close()
runTestsData(t, testName, nodes, "mydb", "myrp")
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp")

View File

@ -139,7 +139,7 @@ func TestBatchWrite_UnmarshalRFC(t *testing.T) {
// Ensure that even if a measurement is not found, that the status code is still 200
func TestHandler_ShowMeasurementsNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -163,7 +163,7 @@ func TestHandler_ShowMeasurementsNotFound(t *testing.T) {
}
func TestHandler_CreateDatabase(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -178,7 +178,7 @@ func TestHandler_CreateDatabase(t *testing.T) {
}
func TestHandler_CreateDatabase_BadRequest_NoName(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -191,7 +191,7 @@ func TestHandler_CreateDatabase_BadRequest_NoName(t *testing.T) {
}
func TestHandler_CreateDatabase_Conflict(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -207,7 +207,7 @@ func TestHandler_CreateDatabase_Conflict(t *testing.T) {
}
func TestHandler_DropDatabase(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -223,7 +223,7 @@ func TestHandler_DropDatabase(t *testing.T) {
}
func TestHandler_DropDatabase_NotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -238,7 +238,7 @@ func TestHandler_DropDatabase_NotFound(t *testing.T) {
}
func TestHandler_RetentionPolicies(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -256,7 +256,7 @@ func TestHandler_RetentionPolicies(t *testing.T) {
}
func TestHandler_RetentionPolicies_DatabaseNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -272,7 +272,7 @@ func TestHandler_RetentionPolicies_DatabaseNotFound(t *testing.T) {
}
func TestHandler_CreateRetentionPolicy(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -290,7 +290,7 @@ func TestHandler_CreateRetentionPolicy(t *testing.T) {
}
func TestHandler_CreateRetentionPolicyAsDefault(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -315,7 +315,7 @@ func TestHandler_CreateRetentionPolicyAsDefault(t *testing.T) {
}
func TestHandler_CreateRetentionPolicy_DatabaseNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -330,7 +330,7 @@ func TestHandler_CreateRetentionPolicy_DatabaseNotFound(t *testing.T) {
}
func TestHandler_CreateRetentionPolicy_Conflict(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -348,7 +348,7 @@ func TestHandler_CreateRetentionPolicy_Conflict(t *testing.T) {
}
func TestHandler_CreateRetentionPolicy_BadRequest(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -364,7 +364,7 @@ func TestHandler_CreateRetentionPolicy_BadRequest(t *testing.T) {
}
func TestHandler_UpdateRetentionPolicy(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -396,7 +396,7 @@ func TestHandler_UpdateRetentionPolicy(t *testing.T) {
}
func TestHandler_UpdateRetentionPolicy_BadRequest(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -414,7 +414,7 @@ func TestHandler_UpdateRetentionPolicy_BadRequest(t *testing.T) {
}
func TestHandler_UpdateRetentionPolicy_DatabaseNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -430,7 +430,7 @@ func TestHandler_UpdateRetentionPolicy_DatabaseNotFound(t *testing.T) {
}
func TestHandler_UpdateRetentionPolicy_NotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -448,7 +448,7 @@ func TestHandler_UpdateRetentionPolicy_NotFound(t *testing.T) {
}
func TestHandler_DeleteRetentionPolicy(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -467,7 +467,7 @@ func TestHandler_DeleteRetentionPolicy(t *testing.T) {
}
func TestHandler_DeleteRetentionPolicy_DatabaseNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -484,7 +484,7 @@ func TestHandler_DeleteRetentionPolicy_DatabaseNotFound(t *testing.T) {
}
func TestHandler_DeleteRetentionPolicy_NotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -502,7 +502,7 @@ func TestHandler_DeleteRetentionPolicy_NotFound(t *testing.T) {
}
func TestHandler_GzipEnabled(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -528,7 +528,7 @@ func TestHandler_GzipEnabled(t *testing.T) {
}
func TestHandler_GzipDisabled(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -554,7 +554,7 @@ func TestHandler_GzipDisabled(t *testing.T) {
}
func TestHandler_Index(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -572,7 +572,7 @@ func TestHandler_Index(t *testing.T) {
}
func TestHandler_Wait(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -590,7 +590,7 @@ func TestHandler_Wait(t *testing.T) {
}
func TestHandler_WaitIncrement(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -610,7 +610,7 @@ func TestHandler_WaitIncrement(t *testing.T) {
}
func TestHandler_WaitNoIndexSpecified(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -624,7 +624,7 @@ func TestHandler_WaitNoIndexSpecified(t *testing.T) {
}
func TestHandler_WaitInvalidIndexSpecified(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -638,7 +638,7 @@ func TestHandler_WaitInvalidIndexSpecified(t *testing.T) {
}
func TestHandler_WaitExpectTimeout(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -652,7 +652,7 @@ func TestHandler_WaitExpectTimeout(t *testing.T) {
}
func TestHandler_Ping(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -666,7 +666,7 @@ func TestHandler_Ping(t *testing.T) {
}
func TestHandler_PingHead(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -680,7 +680,7 @@ func TestHandler_PingHead(t *testing.T) {
}
func TestHandler_Users_MultipleUsers(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateUser("jdoe", "1337", false)
@ -700,7 +700,7 @@ func TestHandler_Users_MultipleUsers(t *testing.T) {
func TestHandler_UpdateUser(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateUser("jdoe", "1337", false)
@ -723,7 +723,7 @@ func TestHandler_UpdateUser(t *testing.T) {
func TestHandler_UpdateUser_PasswordBadRequest(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateUser("jdoe", "1337", false)
@ -740,7 +740,7 @@ func TestHandler_UpdateUser_PasswordBadRequest(t *testing.T) {
func TestHandler_DataNodes(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenUninitializedServer(c)
srvr.CreateDataNode(MustParseURL("http://localhost:1000"))
@ -759,7 +759,7 @@ func TestHandler_DataNodes(t *testing.T) {
func TestHandler_CreateDataNode(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenUninitializedServer(c)
s := NewHTTPServer(srvr)
@ -775,7 +775,7 @@ func TestHandler_CreateDataNode(t *testing.T) {
func TestHandler_CreateDataNode_BadRequest(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -791,7 +791,7 @@ func TestHandler_CreateDataNode_BadRequest(t *testing.T) {
func TestHandler_CreateDataNode_InternalServerError(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -807,7 +807,7 @@ func TestHandler_CreateDataNode_InternalServerError(t *testing.T) {
func TestHandler_DeleteDataNode(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDataNode(MustParseURL("http://localhost:1000"))
@ -824,7 +824,7 @@ func TestHandler_DeleteDataNode(t *testing.T) {
func TestHandler_DeleteUser_DataNodeNotFound(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -841,7 +841,7 @@ func TestHandler_DeleteUser_DataNodeNotFound(t *testing.T) {
// Perform a subset of endpoint testing, with authentication enabled.
func TestHandler_AuthenticatedCreateAdminUser(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewAuthenticatedHTTPServer(srvr)
@ -864,7 +864,7 @@ func TestHandler_AuthenticatedCreateAdminUser(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_Unauthorized(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewAuthenticatedHTTPServer(srvr)
@ -877,7 +877,7 @@ func TestHandler_AuthenticatedDatabases_Unauthorized(t *testing.T) {
}
func TestHandler_QueryParamenterMissing(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -892,7 +892,7 @@ func TestHandler_QueryParamenterMissing(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_AuthorizedQueryParams(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateUser("lisa", "password", true)
@ -907,7 +907,7 @@ func TestHandler_AuthenticatedDatabases_AuthorizedQueryParams(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_UnauthorizedQueryParams(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateUser("lisa", "password", true)
@ -922,7 +922,7 @@ func TestHandler_AuthenticatedDatabases_UnauthorizedQueryParams(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_AuthorizedBasicAuth(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateUser("lisa", "password", true)
@ -939,7 +939,7 @@ func TestHandler_AuthenticatedDatabases_AuthorizedBasicAuth(t *testing.T) {
}
func TestHandler_AuthenticatedDatabases_UnauthorizedBasicAuth(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateUser("lisa", "password", true)
@ -956,7 +956,7 @@ func TestHandler_AuthenticatedDatabases_UnauthorizedBasicAuth(t *testing.T) {
}
func TestHandler_GrantDBPrivilege(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
// Create a cluster admin that will grant privilege to "john".
@ -995,7 +995,7 @@ func TestHandler_GrantDBPrivilege(t *testing.T) {
}
func TestHandler_RevokeAdmin(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
// Create a cluster admin that will revoke admin from "john".
@ -1029,7 +1029,7 @@ func TestHandler_RevokeAdmin(t *testing.T) {
}
func TestHandler_RevokeDBPrivilege(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
// Create a cluster admin that will revoke privilege from "john".
@ -1065,7 +1065,7 @@ func TestHandler_RevokeDBPrivilege(t *testing.T) {
}
func TestHandler_DropSeries(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1088,7 +1088,7 @@ func TestHandler_DropSeries(t *testing.T) {
}
func TestHandler_serveWriteSeries(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1111,7 +1111,7 @@ func TestHandler_serveWriteSeries(t *testing.T) {
}
func TestHandler_serveDump(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1142,7 +1142,7 @@ func TestHandler_serveDump(t *testing.T) {
}
func TestHandler_serveWriteSeriesWithNoFields(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateDatabase("foo")
@ -1162,7 +1162,7 @@ func TestHandler_serveWriteSeriesWithNoFields(t *testing.T) {
}
func TestHandler_serveWriteSeriesWithAuthNilUser(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
srvr.CreateDatabase("foo")
@ -1183,7 +1183,7 @@ func TestHandler_serveWriteSeriesWithAuthNilUser(t *testing.T) {
}
func TestHandler_serveWriteSeries_noDatabaseExists(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewHTTPServer(srvr)
@ -1203,7 +1203,7 @@ func TestHandler_serveWriteSeries_noDatabaseExists(t *testing.T) {
}
func TestHandler_serveWriteSeries_errorHasJsonContentType(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
s := NewHTTPServer(srvr)
@ -1230,7 +1230,7 @@ func TestHandler_serveWriteSeries_errorHasJsonContentType(t *testing.T) {
}
func TestHandler_serveWriteSeries_queryHasJsonContentType(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1290,7 +1290,7 @@ func TestHandler_serveWriteSeries_queryHasJsonContentType(t *testing.T) {
}
func TestHandler_serveWriteSeries_invalidJSON(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewHTTPServer(srvr)
@ -1309,7 +1309,7 @@ func TestHandler_serveWriteSeries_invalidJSON(t *testing.T) {
}
func TestHandler_serveWriteSeries_noDatabaseSpecified(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewHTTPServer(srvr)
@ -1328,7 +1328,7 @@ func TestHandler_serveWriteSeries_noDatabaseSpecified(t *testing.T) {
}
func TestHandler_serveWriteSeriesNonZeroTime(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1371,7 +1371,7 @@ func TestHandler_serveWriteSeriesNonZeroTime(t *testing.T) {
}
func TestHandler_serveWriteSeriesZeroTime(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1426,7 +1426,7 @@ func TestHandler_serveWriteSeriesZeroTime(t *testing.T) {
}
func TestHandler_serveWriteSeriesBatch(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1511,7 +1511,7 @@ func TestHandler_serveWriteSeriesBatch(t *testing.T) {
}
func TestHandler_serveWriteSeriesFieldTypeConflict(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
@ -1554,7 +1554,7 @@ func str2iface(strs []string) []interface{} {
}
func TestHandler_ProcessContinousQueries(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthenticatedServer(c)
s := NewAuthenticatedHTTPServer(srvr)

View File

@ -472,37 +472,51 @@ func (b *Broker) TopicReader(topicID, index uint64, streaming bool) io.ReadClose
return NewTopicReader(b.TopicPath(topicID), index, streaming)
}
// SetTopicMaxIndex updates the highest replicated index for a topic.
// SetTopicMaxIndex updates the highest replicated index for a topic and data URL.
// If a higher index is already set on the topic then the call is ignored.
// This index is only held in memory and is used for topic segment reclamation.
func (b *Broker) SetTopicMaxIndex(topicID, index uint64) error {
// The higheset replciated index per data URL is tracked separately from the current index
func (b *Broker) SetTopicMaxIndex(topicID, index uint64, u url.URL) error {
_, err := b.Publish(&Message{
Type: SetTopicMaxIndexMessageType,
Data: marshalTopicIndex(topicID, index),
Data: marshalTopicIndex(topicID, index, u),
})
return err
}
func (b *Broker) applySetTopicMaxIndex(m *Message) {
topicID, index := unmarshalTopicIndex(m.Data)
topicID, index, u := unmarshalTopicIndex(m.Data)
// Set index if it's not already set higher.
t := b.topics[topicID]
if t != nil && t.index < index {
t.index = index
if t := b.topics[topicID]; t != nil {
// Track the highest replicated index per data node URL
t.indexByURL[u] = index
if t.index < index {
t.index = index
}
}
}
func marshalTopicIndex(topicID, index uint64) []byte {
b := make([]byte, 16)
func marshalTopicIndex(topicID, index uint64, u url.URL) []byte {
s := []byte(u.String())
b := make([]byte, 16+2+len(s))
binary.BigEndian.PutUint64(b[0:8], topicID)
binary.BigEndian.PutUint64(b[8:16], index)
binary.BigEndian.PutUint16(b[16:18], uint16(len(s))) // URL string length
n := copy(b[18:], s) // URL string
assert(n == len(s), "marshal topic index too short. have %d, expectd %d", n, len(s))
return b
}
func unmarshalTopicIndex(b []byte) (topicID, index uint64) {
func unmarshalTopicIndex(b []byte) (topicID, index uint64, u url.URL) {
assert(len(b) >= 18, "unmarshal topic index too short. have %d, expected %d", len(b), 20)
topicID = binary.BigEndian.Uint64(b[0:8])
index = binary.BigEndian.Uint64(b[8:16])
n := binary.BigEndian.Uint16(b[16:18]) // URL length
du, err := url.Parse(string(b[18 : 18+n])) // URL
assert(err == nil, "unmarshal binary error: %s", err)
u = *du
return
}
@ -619,9 +633,13 @@ const DefaultMaxSegmentSize = 10 * 1024 * 1024 // 10MB
type Topic struct {
mu sync.Mutex
id uint64 // unique identifier
index uint64 // highest index replicated
index uint64 // current index
path string // on-disk path
// highest index replicated per data url. The unique set of keys across all topics
// provides a snapshot of the addresses of every data node in a cluster.
indexByURL map[url.URL]uint64
file *os.File // last segment writer
opened bool
@ -632,9 +650,9 @@ type Topic struct {
// NewTopic returns a new instance of Topic.
func NewTopic(id uint64, path string) *Topic {
return &Topic{
id: id,
path: path,
id: id,
path: path,
indexByURL: make(map[url.URL]uint64),
MaxSegmentSize: DefaultMaxSegmentSize,
}
}
@ -652,6 +670,13 @@ func (t *Topic) Index() uint64 {
return t.index
}
// IndexForURL returns the highest index replicated for a given data URL.
func (t *Topic) IndexForURL(u url.URL) uint64 {
t.mu.Lock()
defer t.mu.Unlock()
return t.indexByURL[u]
}
// SegmentPath returns the path to a segment starting with a given log index.
func (t *Topic) SegmentPath(index uint64) string {
t.mu.Lock()

View File

@ -120,17 +120,26 @@ func TestBroker_Apply_SetMaxTopicIndex(t *testing.T) {
t.Fatal("topic not created")
}
testDataURL, _ := url.Parse("http://localhost:1234/data")
data := []byte{0, 0, 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 5} // topicID=20, index=5,
data = append(data, []byte{0, byte(len(testDataURL.String()))}...) // len= <url length>
data = append(data, []byte(testDataURL.String())...)
// Set topic #1's index to "2".
if err := b.Apply(&messaging.Message{
Index: 2,
Type: messaging.SetTopicMaxIndexMessageType,
Data: []byte{0, 0, 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 5}, // topicID=20, index=5
Data: data,
}); err != nil {
t.Fatalf("apply error: %s", err)
}
if topic := b.Topic(20); topic.Index() != 5 {
t.Fatalf("unexpected topic index: %d", topic.Index())
}
if topic := b.Topic(20); topic.IndexForURL(*testDataURL) != 5 {
t.Fatalf("unexpected topic url index: %d", topic.IndexForURL(*testDataURL))
}
}
// Ensure the broker can read from topics after reopening.
@ -232,6 +241,9 @@ func TestBroker_SetTopicMaxIndex(t *testing.T) {
b := OpenBroker()
defer b.Close()
testDataURL, _ := url.Parse("http://localhost:1234/data")
urlData := []byte{0, byte(len(testDataURL.String()))} // len=26
urlData = append(urlData, []byte(testDataURL.String())...)
// Ensure the appropriate message is sent to the log.
b.Log().ApplyFunc = func(data []byte) (uint64, error) {
m, _ := messaging.UnmarshalMessage(data)
@ -239,12 +251,14 @@ func TestBroker_SetTopicMaxIndex(t *testing.T) {
t.Fatalf("unexpected topic id data: %x", data[0:8])
} else if !bytes.Equal(m.Data[8:16], []byte{0, 0, 0, 0, 0, 0, 0, 2}) {
t.Fatalf("unexpected index data: %x", data[8:16])
} else if !bytes.Equal(m.Data[16:44], urlData) {
t.Fatalf("unexpected url data: %v", m.Data[16:44])
}
return 1, nil
}
// Set the highest replicated topic index.
if err := b.SetTopicMaxIndex(1, 2); err != nil {
if err := b.SetTopicMaxIndex(1, 2, *testDataURL); err != nil {
t.Fatal(err)
}
}

View File

@ -24,15 +24,20 @@ const (
// DefaultPingInterval is the default time to wait between checks to the broker.
DefaultPingInterval = 1000 * time.Millisecond
// DefaultHeartbeatInterval is the default time that a topic subscriber heartbeats
// with a broker
DefaultHeartbeatInterval = 1000 * time.Millisecond
)
// Client represents a client for the broker's HTTP API.
type Client struct {
mu sync.Mutex
path string // config file path
conns []*Conn // all connections opened by client
url url.URL // current known leader URL
urls []url.URL // list of available broker URLs
mu sync.Mutex
path string // config file path
conns []*Conn // all connections opened by client
url url.URL // current known leader URL
urls []url.URL // list of available broker URLs
dataURL url.URL // URL of the client's data node
opened bool
@ -50,10 +55,11 @@ type Client struct {
}
// NewClient returns a new instance of Client with defaults set.
func NewClient() *Client {
func NewClient(dataURL url.URL) *Client {
c := &Client{
ReconnectTimeout: DefaultReconnectTimeout,
PingInterval: DefaultPingInterval,
dataURL: dataURL,
}
return c
}
@ -340,7 +346,7 @@ func (c *Client) Conn(topicID uint64) *Conn {
defer c.mu.Unlock()
// Create connection and set current URL.
conn := NewConn(topicID)
conn := NewConn(topicID, &c.dataURL)
conn.SetURL(c.url)
// Add to list of client connections.
@ -353,11 +359,13 @@ func (c *Client) Conn(topicID uint64) *Conn {
func (c *Client) pinger(closing chan struct{}) {
defer c.wg.Done()
t := time.NewTicker(c.PingInterval)
defer t.Stop()
for {
select {
case <-closing:
return
case <-time.After(c.PingInterval):
case <-t.C:
c.Ping()
}
}
@ -407,6 +415,7 @@ type Conn struct {
index uint64 // highest index sent over the channel
streaming bool // use streaming reader, if true
url url.URL // current broker url
dataURL url.URL // url for the data node or this caller
opened bool
c chan *Message // channel streams messages from the broker.
@ -417,16 +426,21 @@ type Conn struct {
// The amount of time to wait before reconnecting to a broker stream.
ReconnectTimeout time.Duration
// The amount of time between heartbeats from data nodes to brokers
HeartbeatInterval time.Duration
// The logging interface used by the connection for out-of-band errors.
Logger *log.Logger
}
// NewConn returns a new connection to the broker for a topic.
func NewConn(topicID uint64) *Conn {
func NewConn(topicID uint64, dataURL *url.URL) *Conn {
return &Conn{
topicID: topicID,
ReconnectTimeout: DefaultReconnectTimeout,
Logger: log.New(os.Stderr, "[messaging] ", log.LstdFlags),
topicID: topicID,
dataURL: *dataURL,
ReconnectTimeout: DefaultReconnectTimeout,
HeartbeatInterval: DefaultHeartbeatInterval,
Logger: log.New(os.Stderr, "[messaging] ", log.LstdFlags),
}
}
@ -492,10 +506,10 @@ func (c *Conn) Open(index uint64, streaming bool) error {
c.c = make(chan *Message, 0)
// Start goroutines.
c.wg.Add(1)
c.wg.Add(2)
c.closing = make(chan struct{})
go c.streamer(c.closing)
go c.heartbeater(c.closing)
return nil
}
@ -531,6 +545,22 @@ func (c *Conn) close() error {
return nil
}
// heartbeater periodically heartbeats the broker its index
func (c *Conn) heartbeater(closing chan struct{}) {
defer c.wg.Done()
t := time.NewTicker(c.HeartbeatInterval)
defer t.Stop()
for {
select {
case <-closing:
return
case <-t.C:
c.Heartbeat()
}
}
}
// Heartbeat sends a heartbeat back to the broker with the client's index.
func (c *Conn) Heartbeat() error {
var resp *http.Response
@ -546,6 +576,7 @@ func (c *Conn) Heartbeat() error {
u.RawQuery = url.Values{
"topicID": {strconv.FormatUint(topicID, 10)},
"index": {strconv.FormatUint(index, 10)},
"url": {c.dataURL.String()},
}.Encode()
resp, err = http.Post(u.String(), "application/octet-stream", nil)
if err != nil {

View File

@ -14,6 +14,12 @@ import (
"github.com/influxdb/influxdb/messaging"
)
var testDataURL *url.URL
func init() {
testDataURL, _ = url.Parse("http://localhost:1234/data")
}
// Ensure a client can open the configuration file, if it exists.
func TestClient_Open_WithConfig(t *testing.T) {
// Write configuration file.
@ -477,7 +483,7 @@ func TestClient_Conn(t *testing.T) {
// Ensure that an error is returned when opening an opened connection.
func TestConn_Open_ErrConnOpen(t *testing.T) {
c := messaging.NewConn(1)
c := messaging.NewConn(1, testDataURL)
c.Open(0, false)
defer c.Close()
if err := c.Open(0, false); err != messaging.ErrConnOpen {
@ -487,7 +493,7 @@ func TestConn_Open_ErrConnOpen(t *testing.T) {
// Ensure that an error is returned when opening a previously closed connection.
func TestConn_Open_ErrConnCannotReuse(t *testing.T) {
c := messaging.NewConn(1)
c := messaging.NewConn(1, testDataURL)
c.Open(0, false)
c.Close()
if err := c.Open(0, false); err != messaging.ErrConnCannotReuse {
@ -497,7 +503,7 @@ func TestConn_Open_ErrConnCannotReuse(t *testing.T) {
// Ensure that an error is returned when closing a closed connection.
func TestConn_Close_ErrConnClosed(t *testing.T) {
c := messaging.NewConn(1)
c := messaging.NewConn(1, testDataURL)
c.Open(0, false)
c.Close()
if err := c.Close(); err != messaging.ErrConnClosed {
@ -524,7 +530,7 @@ func TestConn_Open(t *testing.T) {
defer s.Close()
// Create and open connection to server.
c := messaging.NewConn(100)
c := messaging.NewConn(100, testDataURL)
c.SetURL(*MustParseURL(s.URL))
if err := c.Open(200, false); err != nil {
t.Fatal(err)
@ -561,7 +567,7 @@ func TestConn_Open_Reconnect(t *testing.T) {
defer s.Close()
// Create and open connection to server.
c := messaging.NewConn(100)
c := messaging.NewConn(100, testDataURL)
c.SetURL(*MustParseURL(s.URL))
if err := c.Open(0, false); err != nil {
t.Fatal(err)
@ -590,12 +596,14 @@ func TestConn_Heartbeat(t *testing.T) {
t.Fatalf("unexpected topic id: %s", topicID)
} else if index := req.URL.Query().Get("index"); index != "200" {
t.Fatalf("unexpected index: %s", index)
} else if url := req.URL.Query().Get("url"); url != "http://localhost:1234/data" {
t.Fatalf("unexpected url: %s, got %s", "http://localhost:1234/data", url)
}
}))
defer s.Close()
// Create connection and heartbeat.
c := messaging.NewConn(100)
c := messaging.NewConn(100, testDataURL)
c.SetURL(*MustParseURL(s.URL))
c.SetIndex(200)
if err := c.Heartbeat(); err != nil {
@ -609,7 +617,7 @@ func TestConn_Heartbeat_ErrConnectionRefused(t *testing.T) {
s.Close()
// Create connection and heartbeat.
c := messaging.NewConn(0)
c := messaging.NewConn(0, testDataURL)
c.SetURL(*MustParseURL(s.URL))
if err := c.Heartbeat(); err == nil || !strings.Contains(err.Error(), `connection refused`) {
t.Fatalf("unexpected error: %s", err)
@ -625,7 +633,7 @@ func TestConn_Heartbeat_ErrNoLeader(t *testing.T) {
defer s.Close()
// Create connection and heartbeat.
c := messaging.NewConn(0)
c := messaging.NewConn(0, testDataURL)
c.SetURL(*MustParseURL(s.URL))
if err := c.Heartbeat(); err != messaging.ErrNoLeader {
t.Fatalf("unexpected error: %s", err)
@ -641,7 +649,7 @@ func TestConn_Heartbeat_ErrBrokerError(t *testing.T) {
defer s.Close()
// Create connection and heartbeat.
c := messaging.NewConn(0)
c := messaging.NewConn(0, testDataURL)
c.SetURL(*MustParseURL(s.URL))
if err := c.Heartbeat(); err == nil || err.Error() != `oh no` {
t.Fatalf("unexpected error: %s", err)
@ -656,7 +664,7 @@ func TestConn_Heartbeat_ErrHTTPError(t *testing.T) {
defer s.Close()
// Create connection and heartbeat.
c := messaging.NewConn(0)
c := messaging.NewConn(0, testDataURL)
c.SetURL(*MustParseURL(s.URL))
if err := c.Heartbeat(); err == nil || err.Error() != `heartbeat error: status=500` {
t.Fatalf("unexpected error: %s", err)
@ -711,7 +719,7 @@ type Client struct {
// NewClient returns an new instance of Client.
func NewClient() *Client {
return &Client{messaging.NewClient()}
return &Client{messaging.NewClient(*testDataURL)}
}
// MustOpen opens the client. Panic on error.

View File

@ -70,6 +70,9 @@ var (
// ErrReaderClosed is returned when reading from a closed topic reader.
ErrReaderClosed = errors.New("reader closed")
// ErrURLRequired is returned when making a call without a url parameter
ErrURLRequired = errors.New("url required")
// ErrMessageDataRequired is returned when publishing a message without data.
ErrMessageDataRequired = errors.New("message data required")
)

View File

@ -21,7 +21,7 @@ type Handler struct {
LeaderURL() url.URL
TopicReader(topicID, index uint64, streaming bool) io.ReadCloser
Publish(m *Message) (uint64, error)
SetTopicMaxIndex(topicID, index uint64) error
SetTopicMaxIndex(topicID, index uint64, u url.URL) error
}
RaftHandler http.Handler
@ -160,8 +160,13 @@ func (h *Handler) postHeartbeat(w http.ResponseWriter, r *http.Request) {
return
}
u, err := url.Parse(r.URL.Query().Get("url"))
if err != nil {
h.error(w, ErrURLRequired, http.StatusBadRequest)
}
// Update the topic's highest replicated index.
if err := h.Broker.SetTopicMaxIndex(topicID, index); err == raft.ErrNotLeader {
if err := h.Broker.SetTopicMaxIndex(topicID, index, *u); err == raft.ErrNotLeader {
h.redirectToLeader(w, r)
return
} else if err != nil {

View File

@ -161,7 +161,7 @@ func TestHandler_messages_ErrMethodNotAllowed(t *testing.T) {
// Ensure a handler can receive a heartbeats.
func TestHandler_postHeartbeat(t *testing.T) {
var hb HandlerBroker
hb.SetTopicMaxIndexFunc = func(topicID, index uint64) error {
hb.SetTopicMaxIndexFunc = func(topicID, index uint64, dataURL url.URL) error {
if topicID != 1 {
t.Fatalf("unexpected topic id: %d", topicID)
} else if index != 2 {
@ -274,7 +274,7 @@ type HandlerBroker struct {
LeaderURLFunc func() url.URL
PublishFunc func(m *messaging.Message) (uint64, error)
TopicReaderFunc func(topicID, index uint64, streaming bool) io.ReadCloser
SetTopicMaxIndexFunc func(topicID, index uint64) error
SetTopicMaxIndexFunc func(topicID, index uint64, dataURL url.URL) error
}
func (b *HandlerBroker) URLs() []url.URL { return b.URLsFunc() }
@ -284,8 +284,8 @@ func (b *HandlerBroker) Publish(m *messaging.Message) (uint64, error) { return b
func (b *HandlerBroker) TopicReader(topicID, index uint64, streaming bool) io.ReadCloser {
return b.TopicReaderFunc(topicID, index, streaming)
}
func (b *HandlerBroker) SetTopicMaxIndex(topicID, index uint64) error {
return b.SetTopicMaxIndexFunc(topicID, index)
func (b *HandlerBroker) SetTopicMaxIndex(topicID, index uint64, dataURL url.URL) error {
return b.SetTopicMaxIndexFunc(topicID, index, dataURL)
}
// MustParseURL parses a string into a URL. Panic on error.

View File

@ -3368,8 +3368,8 @@ type messagingClient struct {
}
// NewMessagingClient returns an instance of MessagingClient.
func NewMessagingClient() MessagingClient {
return &messagingClient{messaging.NewClient()}
func NewMessagingClient(dataURL url.URL) MessagingClient {
return &messagingClient{messaging.NewClient(dataURL)}
}
func (c *messagingClient) Conn(topicID uint64) MessagingConn { return c.Client.Conn(topicID) }
@ -3849,3 +3849,12 @@ func (s *Server) CreateSnapshotWriter() (*SnapshotWriter, error) {
defer s.mu.RUnlock()
return createServerSnapshotWriter(s)
}
func (s *Server) URL() *url.URL {
s.mu.RLock()
defer s.mu.RUnlock()
if n := s.dataNodes[s.id]; n != nil {
return n.URL
}
return &url.URL{}
}

View File

@ -21,7 +21,7 @@ import (
// Ensure the server can be successfully opened and closed.
func TestServer_Open(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := NewServer()
defer s.Close()
@ -41,7 +41,7 @@ func TestServer_Open_ErrPathRequired(t *testing.T) { t.Skip("pending") }
// Ensure the server can create a new data node.
func TestServer_CreateDataNode(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -65,7 +65,7 @@ func TestServer_CreateDataNode(t *testing.T) {
// Ensure the server returns an error when creating a duplicate node.
func TestServer_CreateDatabase_ErrDataNodeExists(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -82,7 +82,7 @@ func TestServer_CreateDatabase_ErrDataNodeExists(t *testing.T) {
// Ensure the server can delete a node.
func TestServer_DeleteDataNode(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -107,7 +107,7 @@ func TestServer_DeleteDataNode(t *testing.T) {
// Test unuathorized requests logging
func TestServer_UnauthorizedRequests(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -137,7 +137,7 @@ func TestServer_UnauthorizedRequests(t *testing.T) {
// Test user privilege authorization.
func TestServer_UserPrivilegeAuthorization(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -174,7 +174,7 @@ func TestServer_UserPrivilegeAuthorization(t *testing.T) {
// Test single statement query authorization.
func TestServer_SingleStatementQueryAuthorization(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -243,7 +243,7 @@ func TestServer_SingleStatementQueryAuthorization(t *testing.T) {
// Test multiple statement query authorization.
func TestServer_MultiStatementQueryAuthorization(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -290,7 +290,7 @@ func TestServer_MultiStatementQueryAuthorization(t *testing.T) {
// Ensure the server can create a database.
func TestServer_CreateDatabase(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -314,7 +314,7 @@ func TestServer_CreateDatabase(t *testing.T) {
// Ensure the server returns an error when creating a duplicate database.
func TestServer_CreateDatabase_ErrDatabaseExists(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -330,7 +330,7 @@ func TestServer_CreateDatabase_ErrDatabaseExists(t *testing.T) {
// Ensure the server can drop a database.
func TestServer_DropDatabase(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -358,7 +358,7 @@ func TestServer_DropDatabase(t *testing.T) {
// Ensure the server returns an error when dropping a database that doesn't exist.
func TestServer_DropDatabase_ErrDatabaseNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -371,7 +371,7 @@ func TestServer_DropDatabase_ErrDatabaseNotFound(t *testing.T) {
// Ensure the server can return a list of all databases.
func TestServer_Databases(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -393,7 +393,7 @@ func TestServer_Databases(t *testing.T) {
// Ensure the server can create a new user.
func TestServer_CreateUser(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -431,7 +431,7 @@ func TestServer_CreateUser(t *testing.T) {
// Ensure the server correctly detects when there is an admin user.
func TestServer_AdminUserExists(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -462,7 +462,7 @@ func TestServer_AdminUserExists(t *testing.T) {
// Ensure the server returns an error when creating an user without a name.
func TestServer_CreateUser_ErrUsernameRequired(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -473,7 +473,7 @@ func TestServer_CreateUser_ErrUsernameRequired(t *testing.T) {
// Ensure the server returns an error when creating a duplicate user.
func TestServer_CreateUser_ErrUserExists(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -487,7 +487,7 @@ func TestServer_CreateUser_ErrUserExists(t *testing.T) {
// Ensure the server can delete an existing user.
func TestServer_DeleteUser(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -514,7 +514,7 @@ func TestServer_DeleteUser(t *testing.T) {
// Ensure the server can return a list of all users.
func TestServer_Users(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -536,7 +536,7 @@ func TestServer_Users(t *testing.T) {
// Ensure the server does not return non-existent users
func TestServer_NonExistingUsers(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -559,7 +559,7 @@ func TestServer_NonExistingUsers(t *testing.T) {
// Ensure the database can create a new retention policy.
func TestServer_CreateRetentionPolicy(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -593,7 +593,7 @@ func TestServer_CreateRetentionPolicy(t *testing.T) {
// Ensure the database can create a new retention policy with infinite duration.
func TestServer_CreateRetentionPolicyInfinite(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -628,7 +628,7 @@ func TestServer_CreateRetentionPolicyInfinite(t *testing.T) {
// Ensure the database can creates a default retention policy.
func TestServer_CreateRetentionPolicyDefault(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -662,7 +662,7 @@ func TestServer_CreateRetentionPolicyDefault(t *testing.T) {
// Ensure the server returns an error when creating a retention policy with an invalid db.
func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -673,7 +673,7 @@ func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
// Ensure the server returns an error when creating a retention policy without a name.
func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -685,7 +685,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.
// Ensure the server returns an error when creating a duplicate retention policy.
func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -698,7 +698,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) {
// Ensure the server returns an error when creating a retention policy with a duration less than one hour.
func TestServer_CreateRetentionPolicy_ErrRetentionPolicyMinDuration(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -710,7 +710,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyMinDuration(t *testing.T
// Ensure the database can alter an existing retention policy.
func TestServer_AlterRetentionPolicy(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -795,7 +795,7 @@ func TestServer_AlterRetentionPolicy(t *testing.T) {
// Ensure the server an error is returned if trying to alter a retention policy with a duration too small.
func TestServer_AlterRetentionPolicy_Minduration(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -850,7 +850,7 @@ func TestServer_AlterRetentionPolicy_Minduration(t *testing.T) {
// Ensure the server can delete an existing retention policy.
func TestServer_DeleteRetentionPolicy(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -878,7 +878,7 @@ func TestServer_DeleteRetentionPolicy(t *testing.T) {
// Ensure the server returns an error when deleting a retention policy on invalid db.
func TestServer_DeleteRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -889,7 +889,7 @@ func TestServer_DeleteRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
// Ensure the server returns an error when deleting a retention policy without a name.
func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -901,7 +901,7 @@ func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.
// Ensure the server returns an error when deleting a non-existent retention policy.
func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -913,7 +913,7 @@ func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) {
// Ensure the server can set the default retention policy
func TestServer_SetDefaultRetentionPolicy(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -948,7 +948,7 @@ func TestServer_SetDefaultRetentionPolicy(t *testing.T) {
// Ensure the server returns an error when setting the default retention policy to a non-existant one.
func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -960,7 +960,7 @@ func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.
// Ensure the server pre-creates shard groups as needed.
func TestServer_PreCreateRetentionPolices(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -997,7 +997,7 @@ func TestServer_PreCreateRetentionPolices(t *testing.T) {
// Ensure the server prohibits a zero check interval for retention policy enforcement.
func TestServer_StartRetentionPolicyEnforcement_ErrZeroInterval(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1007,7 +1007,7 @@ func TestServer_StartRetentionPolicyEnforcement_ErrZeroInterval(t *testing.T) {
}
func TestServer_EnforceRetentionPolices(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1044,7 +1044,7 @@ func TestServer_EnforceRetentionPolices(t *testing.T) {
// Ensure the server can drop a measurement.
func TestServer_DropMeasurement(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1107,7 +1107,7 @@ func TestServer_DropMeasurement(t *testing.T) {
// Ensure the server can handles drop measurement if none exists.
func TestServer_DropMeasurementNoneExists(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1138,7 +1138,7 @@ func TestServer_DropMeasurementNoneExists(t *testing.T) {
// Ensure the server can drop a series.
func TestServer_DropSeries(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1182,7 +1182,7 @@ func TestServer_DropSeries(t *testing.T) {
// Ensure the server can drop a series from measurement when more than one shard exists.
func TestServer_DropSeriesFromMeasurement(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1228,7 +1228,7 @@ func TestServer_DropSeriesFromMeasurement(t *testing.T) {
// ensure that the dropped series is gone
// ensure that we can still query: select value from cpu where region=uswest
func TestServer_DropSeriesTagsPreserved(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
@ -1302,7 +1302,7 @@ func TestServer_DropSeriesTagsPreserved(t *testing.T) {
// Ensure the server respects limit and offset in show series queries
func TestServer_ShowSeriesLimitOffset(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1365,7 +1365,7 @@ func TestServer_ShowSeriesLimitOffset(t *testing.T) {
}
func TestServer_CreateShardGroupIfNotExist(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1390,7 +1390,7 @@ func TestServer_CreateShardGroupIfNotExist(t *testing.T) {
}
func TestServer_DeleteShardGroup(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1428,7 +1428,7 @@ func TestServer_DeleteShardGroup(t *testing.T) {
/* TODO(benbjohnson): Change test to not expose underlying series ids directly.
func TestServer_Measurements(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1499,7 +1499,7 @@ func TestServer_NormalizeMeasurement(t *testing.T) {
}
// Create server with a variety of databases, retention policies, and measurements
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1550,7 +1550,7 @@ func TestServer_NormalizeQuery(t *testing.T) {
}
// Start server with database & retention policy.
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1572,7 +1572,7 @@ func TestServer_NormalizeQuery(t *testing.T) {
// Ensure the server can create a continuous query
func TestServer_CreateContinuousQuery(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1632,7 +1632,7 @@ func TestServer_CreateContinuousQuery_ErrInfinteLoop(t *testing.T) {
}
func TestServer_DropContinuousQuery(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1691,7 +1691,7 @@ func TestServer_DropContinuousQuery(t *testing.T) {
// Ensure
func TestServer_RunContinuousQueries(t *testing.T) {
t.Skip()
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
@ -1771,7 +1771,7 @@ func TestServer_RunContinuousQueries(t *testing.T) {
// Ensure the server can create a snapshot writer.
func TestServer_CreateSnapshotWriter(t *testing.T) {
c := test.NewMessagingClient()
c := test.NewDefaultMessagingClient()
s := OpenServer(c)
defer s.Close()

View File

@ -9,11 +9,17 @@ import (
"github.com/influxdb/influxdb/messaging"
)
func init() {
// Ensure the broker matches the handler's interface.
_ = messaging.Handler{Broker: messaging.NewBroker()}
}
// MessagingClient represents a test client for the messaging broker.
type MessagingClient struct {
mu sync.Mutex
index uint64 // highest index
conns []*MessagingConn // list of all connections
mu sync.Mutex
index uint64 // highest index
conns []*MessagingConn // list of all connections
dataURL url.URL // clients data node URL
messagesByTopicID map[uint64][]*messaging.Message // message by topic
@ -22,15 +28,22 @@ type MessagingClient struct {
}
// NewMessagingClient returns a new instance of MessagingClient.
func NewMessagingClient() *MessagingClient {
func NewMessagingClient(dataURL url.URL) *MessagingClient {
c := &MessagingClient{
messagesByTopicID: make(map[uint64][]*messaging.Message),
dataURL: dataURL,
}
c.PublishFunc = c.DefaultPublishFunc
c.ConnFunc = c.DefaultConnFunc
return c
}
// NewMessagingClient returns a new instance of MessagingClient.
func NewDefaultMessagingClient() *MessagingClient {
testDataURL, _ := url.Parse("http://localhost:1234/data")
return NewMessagingClient(*testDataURL)
}
func (c *MessagingClient) Open(path string) error { return nil }
// Close closes all open connections.
@ -82,7 +95,7 @@ func (c *MessagingClient) DefaultConnFunc(topicID uint64) influxdb.MessagingConn
defer c.mu.Unlock()
// Create new connection.
conn := NewMessagingConn(topicID)
conn := NewMessagingConn(topicID, c.dataURL)
// Track connections.
c.conns = append(c.conns, conn)
@ -111,13 +124,15 @@ type MessagingConn struct {
mu sync.Mutex
topicID uint64
index uint64
dataURL url.URL
c chan *messaging.Message
}
// NewMessagingConn returns a new instance of MessagingConn.
func NewMessagingConn(topicID uint64) *MessagingConn {
func NewMessagingConn(topicID uint64, dataURL url.URL) *MessagingConn {
return &MessagingConn{
topicID: topicID,
dataURL: dataURL,
}
}