diff --git a/CHANGELOG.md b/CHANGELOG.md index d9b2ccb6cc..16c693e0d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,11 @@ ## v0.9.0-rc20 [unreleased] +### Features +- [#2128](https://github.com/influxdb/influxdb/pull/2128): Data node discovery from brokers + ### Bugfixes - [#2147](https://github.com/influxdb/influxdb/pull/2147): Set Go Max procs in a better location -- +- ## v0.9.0-rc19 [2015-04-01] ### Features diff --git a/cmd/influxd/restore_test.go b/cmd/influxd/restore_test.go index b31680bc47..8cc44e8e9b 100644 --- a/cmd/influxd/restore_test.go +++ b/cmd/influxd/restore_test.go @@ -16,6 +16,10 @@ import ( // Ensure the restore command can expand a snapshot and bootstrap a broker. func TestRestoreCommand(t *testing.T) { + if testing.Short() { + t.Skip("skipping TestRestoreCommand") + } + now := time.Now() // Create root path to server. @@ -51,7 +55,7 @@ func TestRestoreCommand(t *testing.T) { } // Start server. - b, s := main.Run(c, "", "x.x") + b, s, l := main.Run(c, "", "x.x") if b == nil { t.Fatal("cannot run broker") } else if s == nil { @@ -84,6 +88,7 @@ func TestRestoreCommand(t *testing.T) { f.Close() // Stop server. + l.Close() s.Close() b.Close() @@ -105,7 +110,7 @@ func TestRestoreCommand(t *testing.T) { } // Restart server. - b, s = main.Run(c, "", "x.x") + b, s, l = main.Run(c, "", "x.x") if b == nil { t.Fatal("cannot run broker") } else if s == nil { @@ -135,6 +140,11 @@ func TestRestoreCommand(t *testing.T) { } else if !reflect.DeepEqual(v, map[string]interface{}{"value": float64(1000)}) { t.Fatalf("read series(1) mismatch: %#v", v) } + + // Stop server. + l.Close() + s.Close() + b.Close() } // RestoreCommand is a test wrapper for main.RestoreCommand. diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 6cba7eb3a3..2297cee2cb 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -23,7 +23,7 @@ import ( "github.com/influxdb/influxdb/udp" ) -func Run(config *Config, join, version string) (*messaging.Broker, *influxdb.Server) { +func Run(config *Config, join, version string) (*messaging.Broker, *influxdb.Server, *raft.Log) { log.Printf("influxdb started, version %s, commit %s", version, commit) var initBroker, initServer bool @@ -217,7 +217,7 @@ func Run(config *Config, join, version string) (*messaging.Broker, *influxdb.Ser } } - return b.Broker, s + return b.Broker, s, l } // write the current process id to a file specified by path. @@ -327,7 +327,7 @@ func openServer(config *Config, b *influxdb.Broker, initServer, initBroker bool, } // Create messaging client to the brokers. - c := influxdb.NewMessagingClient() + c := influxdb.NewMessagingClient(config.DataURL()) if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile)); err != nil { log.Fatalf("messaging client error: %s", err) } diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index efebc3ff1d..84ce3e2436 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -19,6 +19,7 @@ import ( "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/messaging" + "github.com/influxdb/influxdb/raft" "github.com/influxdb/influxdb/client" main "github.com/influxdb/influxdb/cmd/influxd" @@ -59,6 +60,7 @@ func rewriteDbRp(old, database, retention string) string { type Node struct { broker *messaging.Broker server *influxdb.Server + log *raft.Log url *url.URL leader bool } @@ -66,6 +68,26 @@ type Node struct { // Cluster represents a multi-node cluster. type Cluster []*Node +func (c *Cluster) Close() { + for _, n := range *c { + if n.log != nil { + n.log.Close() + n.log = nil + } + + if n.server != nil { + n.server.Close() + n.server = nil + } + + if n.broker != nil { + n.broker.Close() + n.broker = nil + } + + } +} + // createCombinedNodeCluster creates a cluster of nServers nodes, each of which // runs as both a Broker and Data node. If any part cluster creation fails, // the testing is marked as failed. @@ -102,7 +124,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba c.ReportingDisabled = true c.Snapshot.Enabled = false - b, s := main.Run(c, "", "x.x") + b, s, l := main.Run(c, "", "x.x") if b == nil { t.Fatalf("Test %s: failed to create broker on port %d", testName, basePort) } @@ -112,6 +134,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba nodes = append(nodes, &Node{ broker: b, server: s, + log: l, url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(basePort)}, leader: true, }) @@ -124,7 +147,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba c.Broker.Port = nextPort c.Data.Port = nextPort - b, s := main.Run(c, "http://localhost:"+strconv.Itoa(basePort), "x.x") + b, s, l := main.Run(c, "http://localhost:"+strconv.Itoa(basePort), "x.x") if b == nil { t.Fatalf("Test %s: failed to create following broker on port %d", testName, basePort) } @@ -135,6 +158,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba nodes = append(nodes, &Node{ broker: b, server: s, + log: l, url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(nextPort)}, }) } @@ -1243,6 +1267,7 @@ func TestSingleServer(t *testing.T) { }() nodes := createCombinedNodeCluster(t, testName, dir, 1, 8090, nil) + defer nodes.Close() runTestsData(t, testName, nodes, "mydb", "myrp") runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp") diff --git a/httpd/handler_test.go b/httpd/handler_test.go index ba3ad147bd..2840e0ded0 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -139,7 +139,7 @@ func TestBatchWrite_UnmarshalRFC(t *testing.T) { // Ensure that even if a measurement is not found, that the status code is still 200 func TestHandler_ShowMeasurementsNotFound(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -163,7 +163,7 @@ func TestHandler_ShowMeasurementsNotFound(t *testing.T) { } func TestHandler_CreateDatabase(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -178,7 +178,7 @@ func TestHandler_CreateDatabase(t *testing.T) { } func TestHandler_CreateDatabase_BadRequest_NoName(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -191,7 +191,7 @@ func TestHandler_CreateDatabase_BadRequest_NoName(t *testing.T) { } func TestHandler_CreateDatabase_Conflict(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -207,7 +207,7 @@ func TestHandler_CreateDatabase_Conflict(t *testing.T) { } func TestHandler_DropDatabase(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -223,7 +223,7 @@ func TestHandler_DropDatabase(t *testing.T) { } func TestHandler_DropDatabase_NotFound(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -238,7 +238,7 @@ func TestHandler_DropDatabase_NotFound(t *testing.T) { } func TestHandler_RetentionPolicies(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -256,7 +256,7 @@ func TestHandler_RetentionPolicies(t *testing.T) { } func TestHandler_RetentionPolicies_DatabaseNotFound(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -272,7 +272,7 @@ func TestHandler_RetentionPolicies_DatabaseNotFound(t *testing.T) { } func TestHandler_CreateRetentionPolicy(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -290,7 +290,7 @@ func TestHandler_CreateRetentionPolicy(t *testing.T) { } func TestHandler_CreateRetentionPolicyAsDefault(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -315,7 +315,7 @@ func TestHandler_CreateRetentionPolicyAsDefault(t *testing.T) { } func TestHandler_CreateRetentionPolicy_DatabaseNotFound(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -330,7 +330,7 @@ func TestHandler_CreateRetentionPolicy_DatabaseNotFound(t *testing.T) { } func TestHandler_CreateRetentionPolicy_Conflict(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -348,7 +348,7 @@ func TestHandler_CreateRetentionPolicy_Conflict(t *testing.T) { } func TestHandler_CreateRetentionPolicy_BadRequest(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -364,7 +364,7 @@ func TestHandler_CreateRetentionPolicy_BadRequest(t *testing.T) { } func TestHandler_UpdateRetentionPolicy(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -396,7 +396,7 @@ func TestHandler_UpdateRetentionPolicy(t *testing.T) { } func TestHandler_UpdateRetentionPolicy_BadRequest(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -414,7 +414,7 @@ func TestHandler_UpdateRetentionPolicy_BadRequest(t *testing.T) { } func TestHandler_UpdateRetentionPolicy_DatabaseNotFound(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -430,7 +430,7 @@ func TestHandler_UpdateRetentionPolicy_DatabaseNotFound(t *testing.T) { } func TestHandler_UpdateRetentionPolicy_NotFound(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -448,7 +448,7 @@ func TestHandler_UpdateRetentionPolicy_NotFound(t *testing.T) { } func TestHandler_DeleteRetentionPolicy(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -467,7 +467,7 @@ func TestHandler_DeleteRetentionPolicy(t *testing.T) { } func TestHandler_DeleteRetentionPolicy_DatabaseNotFound(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -484,7 +484,7 @@ func TestHandler_DeleteRetentionPolicy_DatabaseNotFound(t *testing.T) { } func TestHandler_DeleteRetentionPolicy_NotFound(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -502,7 +502,7 @@ func TestHandler_DeleteRetentionPolicy_NotFound(t *testing.T) { } func TestHandler_GzipEnabled(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -528,7 +528,7 @@ func TestHandler_GzipEnabled(t *testing.T) { } func TestHandler_GzipDisabled(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -554,7 +554,7 @@ func TestHandler_GzipDisabled(t *testing.T) { } func TestHandler_Index(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -572,7 +572,7 @@ func TestHandler_Index(t *testing.T) { } func TestHandler_Wait(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -590,7 +590,7 @@ func TestHandler_Wait(t *testing.T) { } func TestHandler_WaitIncrement(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -610,7 +610,7 @@ func TestHandler_WaitIncrement(t *testing.T) { } func TestHandler_WaitNoIndexSpecified(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -624,7 +624,7 @@ func TestHandler_WaitNoIndexSpecified(t *testing.T) { } func TestHandler_WaitInvalidIndexSpecified(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -638,7 +638,7 @@ func TestHandler_WaitInvalidIndexSpecified(t *testing.T) { } func TestHandler_WaitExpectTimeout(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -652,7 +652,7 @@ func TestHandler_WaitExpectTimeout(t *testing.T) { } func TestHandler_Ping(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -666,7 +666,7 @@ func TestHandler_Ping(t *testing.T) { } func TestHandler_PingHead(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -680,7 +680,7 @@ func TestHandler_PingHead(t *testing.T) { } func TestHandler_Users_MultipleUsers(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateUser("jdoe", "1337", false) @@ -700,7 +700,7 @@ func TestHandler_Users_MultipleUsers(t *testing.T) { func TestHandler_UpdateUser(t *testing.T) { t.Skip() - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateUser("jdoe", "1337", false) @@ -723,7 +723,7 @@ func TestHandler_UpdateUser(t *testing.T) { func TestHandler_UpdateUser_PasswordBadRequest(t *testing.T) { t.Skip() - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateUser("jdoe", "1337", false) @@ -740,7 +740,7 @@ func TestHandler_UpdateUser_PasswordBadRequest(t *testing.T) { func TestHandler_DataNodes(t *testing.T) { t.Skip() - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenUninitializedServer(c) srvr.CreateDataNode(MustParseURL("http://localhost:1000")) @@ -759,7 +759,7 @@ func TestHandler_DataNodes(t *testing.T) { func TestHandler_CreateDataNode(t *testing.T) { t.Skip() - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenUninitializedServer(c) s := NewHTTPServer(srvr) @@ -775,7 +775,7 @@ func TestHandler_CreateDataNode(t *testing.T) { func TestHandler_CreateDataNode_BadRequest(t *testing.T) { t.Skip() - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -791,7 +791,7 @@ func TestHandler_CreateDataNode_BadRequest(t *testing.T) { func TestHandler_CreateDataNode_InternalServerError(t *testing.T) { t.Skip() - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -807,7 +807,7 @@ func TestHandler_CreateDataNode_InternalServerError(t *testing.T) { func TestHandler_DeleteDataNode(t *testing.T) { t.Skip() - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDataNode(MustParseURL("http://localhost:1000")) @@ -824,7 +824,7 @@ func TestHandler_DeleteDataNode(t *testing.T) { func TestHandler_DeleteUser_DataNodeNotFound(t *testing.T) { t.Skip() - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -841,7 +841,7 @@ func TestHandler_DeleteUser_DataNodeNotFound(t *testing.T) { // Perform a subset of endpoint testing, with authentication enabled. func TestHandler_AuthenticatedCreateAdminUser(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) s := NewAuthenticatedHTTPServer(srvr) @@ -864,7 +864,7 @@ func TestHandler_AuthenticatedCreateAdminUser(t *testing.T) { } func TestHandler_AuthenticatedDatabases_Unauthorized(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) s := NewAuthenticatedHTTPServer(srvr) @@ -877,7 +877,7 @@ func TestHandler_AuthenticatedDatabases_Unauthorized(t *testing.T) { } func TestHandler_QueryParamenterMissing(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -892,7 +892,7 @@ func TestHandler_QueryParamenterMissing(t *testing.T) { } func TestHandler_AuthenticatedDatabases_AuthorizedQueryParams(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) srvr.CreateUser("lisa", "password", true) @@ -907,7 +907,7 @@ func TestHandler_AuthenticatedDatabases_AuthorizedQueryParams(t *testing.T) { } func TestHandler_AuthenticatedDatabases_UnauthorizedQueryParams(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) srvr.CreateUser("lisa", "password", true) @@ -922,7 +922,7 @@ func TestHandler_AuthenticatedDatabases_UnauthorizedQueryParams(t *testing.T) { } func TestHandler_AuthenticatedDatabases_AuthorizedBasicAuth(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) srvr.CreateUser("lisa", "password", true) @@ -939,7 +939,7 @@ func TestHandler_AuthenticatedDatabases_AuthorizedBasicAuth(t *testing.T) { } func TestHandler_AuthenticatedDatabases_UnauthorizedBasicAuth(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) srvr.CreateUser("lisa", "password", true) @@ -956,7 +956,7 @@ func TestHandler_AuthenticatedDatabases_UnauthorizedBasicAuth(t *testing.T) { } func TestHandler_GrantDBPrivilege(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) // Create a cluster admin that will grant privilege to "john". @@ -995,7 +995,7 @@ func TestHandler_GrantDBPrivilege(t *testing.T) { } func TestHandler_RevokeAdmin(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) // Create a cluster admin that will revoke admin from "john". @@ -1029,7 +1029,7 @@ func TestHandler_RevokeAdmin(t *testing.T) { } func TestHandler_RevokeDBPrivilege(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) // Create a cluster admin that will revoke privilege from "john". @@ -1065,7 +1065,7 @@ func TestHandler_RevokeDBPrivilege(t *testing.T) { } func TestHandler_DropSeries(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -1088,7 +1088,7 @@ func TestHandler_DropSeries(t *testing.T) { } func TestHandler_serveWriteSeries(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -1111,7 +1111,7 @@ func TestHandler_serveWriteSeries(t *testing.T) { } func TestHandler_serveDump(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -1142,7 +1142,7 @@ func TestHandler_serveDump(t *testing.T) { } func TestHandler_serveWriteSeriesWithNoFields(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) srvr.CreateDatabase("foo") @@ -1162,7 +1162,7 @@ func TestHandler_serveWriteSeriesWithNoFields(t *testing.T) { } func TestHandler_serveWriteSeriesWithAuthNilUser(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) srvr.CreateDatabase("foo") @@ -1183,7 +1183,7 @@ func TestHandler_serveWriteSeriesWithAuthNilUser(t *testing.T) { } func TestHandler_serveWriteSeries_noDatabaseExists(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) s := NewHTTPServer(srvr) @@ -1203,7 +1203,7 @@ func TestHandler_serveWriteSeries_noDatabaseExists(t *testing.T) { } func TestHandler_serveWriteSeries_errorHasJsonContentType(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) @@ -1230,7 +1230,7 @@ func TestHandler_serveWriteSeries_errorHasJsonContentType(t *testing.T) { } func TestHandler_serveWriteSeries_queryHasJsonContentType(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -1290,7 +1290,7 @@ func TestHandler_serveWriteSeries_queryHasJsonContentType(t *testing.T) { } func TestHandler_serveWriteSeries_invalidJSON(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) s := NewHTTPServer(srvr) @@ -1309,7 +1309,7 @@ func TestHandler_serveWriteSeries_invalidJSON(t *testing.T) { } func TestHandler_serveWriteSeries_noDatabaseSpecified(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) s := NewHTTPServer(srvr) @@ -1328,7 +1328,7 @@ func TestHandler_serveWriteSeries_noDatabaseSpecified(t *testing.T) { } func TestHandler_serveWriteSeriesNonZeroTime(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -1371,7 +1371,7 @@ func TestHandler_serveWriteSeriesNonZeroTime(t *testing.T) { } func TestHandler_serveWriteSeriesZeroTime(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -1426,7 +1426,7 @@ func TestHandler_serveWriteSeriesZeroTime(t *testing.T) { } func TestHandler_serveWriteSeriesBatch(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -1511,7 +1511,7 @@ func TestHandler_serveWriteSeriesBatch(t *testing.T) { } func TestHandler_serveWriteSeriesFieldTypeConflict(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") @@ -1554,7 +1554,7 @@ func str2iface(strs []string) []interface{} { } func TestHandler_ProcessContinousQueries(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() srvr := OpenAuthenticatedServer(c) s := NewAuthenticatedHTTPServer(srvr) diff --git a/messaging/broker.go b/messaging/broker.go index f647d27df7..7549fe7f45 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -472,37 +472,51 @@ func (b *Broker) TopicReader(topicID, index uint64, streaming bool) io.ReadClose return NewTopicReader(b.TopicPath(topicID), index, streaming) } -// SetTopicMaxIndex updates the highest replicated index for a topic. +// SetTopicMaxIndex updates the highest replicated index for a topic and data URL. // If a higher index is already set on the topic then the call is ignored. // This index is only held in memory and is used for topic segment reclamation. -func (b *Broker) SetTopicMaxIndex(topicID, index uint64) error { +// The higheset replciated index per data URL is tracked separately from the current index +func (b *Broker) SetTopicMaxIndex(topicID, index uint64, u url.URL) error { _, err := b.Publish(&Message{ Type: SetTopicMaxIndexMessageType, - Data: marshalTopicIndex(topicID, index), + Data: marshalTopicIndex(topicID, index, u), }) return err } func (b *Broker) applySetTopicMaxIndex(m *Message) { - topicID, index := unmarshalTopicIndex(m.Data) + topicID, index, u := unmarshalTopicIndex(m.Data) // Set index if it's not already set higher. - t := b.topics[topicID] - if t != nil && t.index < index { - t.index = index + if t := b.topics[topicID]; t != nil { + // Track the highest replicated index per data node URL + t.indexByURL[u] = index + + if t.index < index { + t.index = index + } } } -func marshalTopicIndex(topicID, index uint64) []byte { - b := make([]byte, 16) +func marshalTopicIndex(topicID, index uint64, u url.URL) []byte { + s := []byte(u.String()) + b := make([]byte, 16+2+len(s)) binary.BigEndian.PutUint64(b[0:8], topicID) binary.BigEndian.PutUint64(b[8:16], index) + binary.BigEndian.PutUint16(b[16:18], uint16(len(s))) // URL string length + n := copy(b[18:], s) // URL string + assert(n == len(s), "marshal topic index too short. have %d, expectd %d", n, len(s)) return b } -func unmarshalTopicIndex(b []byte) (topicID, index uint64) { +func unmarshalTopicIndex(b []byte) (topicID, index uint64, u url.URL) { + assert(len(b) >= 18, "unmarshal topic index too short. have %d, expected %d", len(b), 20) topicID = binary.BigEndian.Uint64(b[0:8]) index = binary.BigEndian.Uint64(b[8:16]) + n := binary.BigEndian.Uint16(b[16:18]) // URL length + du, err := url.Parse(string(b[18 : 18+n])) // URL + assert(err == nil, "unmarshal binary error: %s", err) + u = *du return } @@ -619,9 +633,13 @@ const DefaultMaxSegmentSize = 10 * 1024 * 1024 // 10MB type Topic struct { mu sync.Mutex id uint64 // unique identifier - index uint64 // highest index replicated + index uint64 // current index path string // on-disk path + // highest index replicated per data url. The unique set of keys across all topics + // provides a snapshot of the addresses of every data node in a cluster. + indexByURL map[url.URL]uint64 + file *os.File // last segment writer opened bool @@ -632,9 +650,9 @@ type Topic struct { // NewTopic returns a new instance of Topic. func NewTopic(id uint64, path string) *Topic { return &Topic{ - id: id, - path: path, - + id: id, + path: path, + indexByURL: make(map[url.URL]uint64), MaxSegmentSize: DefaultMaxSegmentSize, } } @@ -652,6 +670,13 @@ func (t *Topic) Index() uint64 { return t.index } +// IndexForURL returns the highest index replicated for a given data URL. +func (t *Topic) IndexForURL(u url.URL) uint64 { + t.mu.Lock() + defer t.mu.Unlock() + return t.indexByURL[u] +} + // SegmentPath returns the path to a segment starting with a given log index. func (t *Topic) SegmentPath(index uint64) string { t.mu.Lock() diff --git a/messaging/broker_test.go b/messaging/broker_test.go index 33d07e3006..6fae315c99 100644 --- a/messaging/broker_test.go +++ b/messaging/broker_test.go @@ -120,17 +120,26 @@ func TestBroker_Apply_SetMaxTopicIndex(t *testing.T) { t.Fatal("topic not created") } + testDataURL, _ := url.Parse("http://localhost:1234/data") + data := []byte{0, 0, 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 5} // topicID=20, index=5, + data = append(data, []byte{0, byte(len(testDataURL.String()))}...) // len= + data = append(data, []byte(testDataURL.String())...) + // Set topic #1's index to "2". if err := b.Apply(&messaging.Message{ Index: 2, Type: messaging.SetTopicMaxIndexMessageType, - Data: []byte{0, 0, 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 5}, // topicID=20, index=5 + Data: data, }); err != nil { t.Fatalf("apply error: %s", err) } + if topic := b.Topic(20); topic.Index() != 5 { t.Fatalf("unexpected topic index: %d", topic.Index()) } + if topic := b.Topic(20); topic.IndexForURL(*testDataURL) != 5 { + t.Fatalf("unexpected topic url index: %d", topic.IndexForURL(*testDataURL)) + } } // Ensure the broker can read from topics after reopening. @@ -232,6 +241,9 @@ func TestBroker_SetTopicMaxIndex(t *testing.T) { b := OpenBroker() defer b.Close() + testDataURL, _ := url.Parse("http://localhost:1234/data") + urlData := []byte{0, byte(len(testDataURL.String()))} // len=26 + urlData = append(urlData, []byte(testDataURL.String())...) // Ensure the appropriate message is sent to the log. b.Log().ApplyFunc = func(data []byte) (uint64, error) { m, _ := messaging.UnmarshalMessage(data) @@ -239,12 +251,14 @@ func TestBroker_SetTopicMaxIndex(t *testing.T) { t.Fatalf("unexpected topic id data: %x", data[0:8]) } else if !bytes.Equal(m.Data[8:16], []byte{0, 0, 0, 0, 0, 0, 0, 2}) { t.Fatalf("unexpected index data: %x", data[8:16]) + } else if !bytes.Equal(m.Data[16:44], urlData) { + t.Fatalf("unexpected url data: %v", m.Data[16:44]) } return 1, nil } // Set the highest replicated topic index. - if err := b.SetTopicMaxIndex(1, 2); err != nil { + if err := b.SetTopicMaxIndex(1, 2, *testDataURL); err != nil { t.Fatal(err) } } diff --git a/messaging/client.go b/messaging/client.go index 9c5977e77f..b1088ae89a 100644 --- a/messaging/client.go +++ b/messaging/client.go @@ -20,19 +20,24 @@ import ( const ( // DefaultReconnectTimeout is the default time to wait between when a broker // stream disconnects and another connection is retried. - DefaultReconnectTimeout = 1000 * time.Millisecond + DefaultReconnectTimeout = 1 * time.Second // DefaultPingInterval is the default time to wait between checks to the broker. - DefaultPingInterval = 1000 * time.Millisecond + DefaultPingInterval = 1 * time.Second + + // DefaultHeartbeatInterval is the default time that a topic subscriber heartbeats + // with a broker + DefaultHeartbeatInterval = 1 * time.Second ) // Client represents a client for the broker's HTTP API. type Client struct { - mu sync.Mutex - path string // config file path - conns []*Conn // all connections opened by client - url url.URL // current known leader URL - urls []url.URL // list of available broker URLs + mu sync.Mutex + path string // config file path + conns []*Conn // all connections opened by client + url url.URL // current known leader URL + urls []url.URL // list of available broker URLs + dataURL url.URL // URL of the client's data node opened bool @@ -50,10 +55,11 @@ type Client struct { } // NewClient returns a new instance of Client with defaults set. -func NewClient() *Client { +func NewClient(dataURL url.URL) *Client { c := &Client{ ReconnectTimeout: DefaultReconnectTimeout, PingInterval: DefaultPingInterval, + dataURL: dataURL, } return c } @@ -340,7 +346,7 @@ func (c *Client) Conn(topicID uint64) *Conn { defer c.mu.Unlock() // Create connection and set current URL. - conn := NewConn(topicID) + conn := NewConn(topicID, &c.dataURL) conn.SetURL(c.url) // Add to list of client connections. @@ -353,11 +359,13 @@ func (c *Client) Conn(topicID uint64) *Conn { func (c *Client) pinger(closing chan struct{}) { defer c.wg.Done() + t := time.NewTicker(c.PingInterval) + defer t.Stop() for { select { case <-closing: return - case <-time.After(c.PingInterval): + case <-t.C: c.Ping() } } @@ -407,6 +415,7 @@ type Conn struct { index uint64 // highest index sent over the channel streaming bool // use streaming reader, if true url url.URL // current broker url + dataURL url.URL // url for the data node or this caller opened bool c chan *Message // channel streams messages from the broker. @@ -417,16 +426,21 @@ type Conn struct { // The amount of time to wait before reconnecting to a broker stream. ReconnectTimeout time.Duration + // The amount of time between heartbeats from data nodes to brokers + HeartbeatInterval time.Duration + // The logging interface used by the connection for out-of-band errors. Logger *log.Logger } // NewConn returns a new connection to the broker for a topic. -func NewConn(topicID uint64) *Conn { +func NewConn(topicID uint64, dataURL *url.URL) *Conn { return &Conn{ - topicID: topicID, - ReconnectTimeout: DefaultReconnectTimeout, - Logger: log.New(os.Stderr, "[messaging] ", log.LstdFlags), + topicID: topicID, + dataURL: *dataURL, + ReconnectTimeout: DefaultReconnectTimeout, + HeartbeatInterval: DefaultHeartbeatInterval, + Logger: log.New(os.Stderr, "[messaging] ", log.LstdFlags), } } @@ -492,10 +506,10 @@ func (c *Conn) Open(index uint64, streaming bool) error { c.c = make(chan *Message, 0) // Start goroutines. - c.wg.Add(1) + c.wg.Add(2) c.closing = make(chan struct{}) go c.streamer(c.closing) - + go c.heartbeater(c.closing) return nil } @@ -531,6 +545,22 @@ func (c *Conn) close() error { return nil } +// heartbeater periodically heartbeats the broker its index +func (c *Conn) heartbeater(closing chan struct{}) { + defer c.wg.Done() + + t := time.NewTicker(c.HeartbeatInterval) + defer t.Stop() + for { + select { + case <-closing: + return + case <-t.C: + c.Heartbeat() + } + } +} + // Heartbeat sends a heartbeat back to the broker with the client's index. func (c *Conn) Heartbeat() error { var resp *http.Response @@ -546,6 +576,7 @@ func (c *Conn) Heartbeat() error { u.RawQuery = url.Values{ "topicID": {strconv.FormatUint(topicID, 10)}, "index": {strconv.FormatUint(index, 10)}, + "url": {c.dataURL.String()}, }.Encode() resp, err = http.Post(u.String(), "application/octet-stream", nil) if err != nil { diff --git a/messaging/client_test.go b/messaging/client_test.go index 8feadacc21..896f77c672 100644 --- a/messaging/client_test.go +++ b/messaging/client_test.go @@ -14,6 +14,12 @@ import ( "github.com/influxdb/influxdb/messaging" ) +var testDataURL *url.URL + +func init() { + testDataURL, _ = url.Parse("http://localhost:1234/data") +} + // Ensure a client can open the configuration file, if it exists. func TestClient_Open_WithConfig(t *testing.T) { // Write configuration file. @@ -477,7 +483,7 @@ func TestClient_Conn(t *testing.T) { // Ensure that an error is returned when opening an opened connection. func TestConn_Open_ErrConnOpen(t *testing.T) { - c := messaging.NewConn(1) + c := messaging.NewConn(1, testDataURL) c.Open(0, false) defer c.Close() if err := c.Open(0, false); err != messaging.ErrConnOpen { @@ -487,7 +493,7 @@ func TestConn_Open_ErrConnOpen(t *testing.T) { // Ensure that an error is returned when opening a previously closed connection. func TestConn_Open_ErrConnCannotReuse(t *testing.T) { - c := messaging.NewConn(1) + c := messaging.NewConn(1, testDataURL) c.Open(0, false) c.Close() if err := c.Open(0, false); err != messaging.ErrConnCannotReuse { @@ -497,7 +503,7 @@ func TestConn_Open_ErrConnCannotReuse(t *testing.T) { // Ensure that an error is returned when closing a closed connection. func TestConn_Close_ErrConnClosed(t *testing.T) { - c := messaging.NewConn(1) + c := messaging.NewConn(1, testDataURL) c.Open(0, false) c.Close() if err := c.Close(); err != messaging.ErrConnClosed { @@ -524,7 +530,7 @@ func TestConn_Open(t *testing.T) { defer s.Close() // Create and open connection to server. - c := messaging.NewConn(100) + c := messaging.NewConn(100, testDataURL) c.SetURL(*MustParseURL(s.URL)) if err := c.Open(200, false); err != nil { t.Fatal(err) @@ -561,7 +567,7 @@ func TestConn_Open_Reconnect(t *testing.T) { defer s.Close() // Create and open connection to server. - c := messaging.NewConn(100) + c := messaging.NewConn(100, testDataURL) c.SetURL(*MustParseURL(s.URL)) if err := c.Open(0, false); err != nil { t.Fatal(err) @@ -590,12 +596,14 @@ func TestConn_Heartbeat(t *testing.T) { t.Fatalf("unexpected topic id: %s", topicID) } else if index := req.URL.Query().Get("index"); index != "200" { t.Fatalf("unexpected index: %s", index) + } else if url := req.URL.Query().Get("url"); url != "http://localhost:1234/data" { + t.Fatalf("unexpected url: %s, got %s", "http://localhost:1234/data", url) } })) defer s.Close() // Create connection and heartbeat. - c := messaging.NewConn(100) + c := messaging.NewConn(100, testDataURL) c.SetURL(*MustParseURL(s.URL)) c.SetIndex(200) if err := c.Heartbeat(); err != nil { @@ -609,7 +617,7 @@ func TestConn_Heartbeat_ErrConnectionRefused(t *testing.T) { s.Close() // Create connection and heartbeat. - c := messaging.NewConn(0) + c := messaging.NewConn(0, testDataURL) c.SetURL(*MustParseURL(s.URL)) if err := c.Heartbeat(); err == nil || !strings.Contains(err.Error(), `connection refused`) { t.Fatalf("unexpected error: %s", err) @@ -625,7 +633,7 @@ func TestConn_Heartbeat_ErrNoLeader(t *testing.T) { defer s.Close() // Create connection and heartbeat. - c := messaging.NewConn(0) + c := messaging.NewConn(0, testDataURL) c.SetURL(*MustParseURL(s.URL)) if err := c.Heartbeat(); err != messaging.ErrNoLeader { t.Fatalf("unexpected error: %s", err) @@ -641,7 +649,7 @@ func TestConn_Heartbeat_ErrBrokerError(t *testing.T) { defer s.Close() // Create connection and heartbeat. - c := messaging.NewConn(0) + c := messaging.NewConn(0, testDataURL) c.SetURL(*MustParseURL(s.URL)) if err := c.Heartbeat(); err == nil || err.Error() != `oh no` { t.Fatalf("unexpected error: %s", err) @@ -656,7 +664,7 @@ func TestConn_Heartbeat_ErrHTTPError(t *testing.T) { defer s.Close() // Create connection and heartbeat. - c := messaging.NewConn(0) + c := messaging.NewConn(0, testDataURL) c.SetURL(*MustParseURL(s.URL)) if err := c.Heartbeat(); err == nil || err.Error() != `heartbeat error: status=500` { t.Fatalf("unexpected error: %s", err) @@ -711,7 +719,7 @@ type Client struct { // NewClient returns an new instance of Client. func NewClient() *Client { - return &Client{messaging.NewClient()} + return &Client{messaging.NewClient(*testDataURL)} } // MustOpen opens the client. Panic on error. diff --git a/messaging/errors.go b/messaging/errors.go index ce87031719..af9dc57df8 100644 --- a/messaging/errors.go +++ b/messaging/errors.go @@ -70,6 +70,9 @@ var ( // ErrReaderClosed is returned when reading from a closed topic reader. ErrReaderClosed = errors.New("reader closed") + // ErrURLRequired is returned when making a call without a url parameter + ErrURLRequired = errors.New("url required") + // ErrMessageDataRequired is returned when publishing a message without data. ErrMessageDataRequired = errors.New("message data required") ) diff --git a/messaging/handler.go b/messaging/handler.go index c6f9fdd3a4..76e132115e 100644 --- a/messaging/handler.go +++ b/messaging/handler.go @@ -21,7 +21,7 @@ type Handler struct { LeaderURL() url.URL TopicReader(topicID, index uint64, streaming bool) io.ReadCloser Publish(m *Message) (uint64, error) - SetTopicMaxIndex(topicID, index uint64) error + SetTopicMaxIndex(topicID, index uint64, u url.URL) error } RaftHandler http.Handler @@ -160,8 +160,13 @@ func (h *Handler) postHeartbeat(w http.ResponseWriter, r *http.Request) { return } + u, err := url.Parse(r.URL.Query().Get("url")) + if err != nil { + h.error(w, ErrURLRequired, http.StatusBadRequest) + } + // Update the topic's highest replicated index. - if err := h.Broker.SetTopicMaxIndex(topicID, index); err == raft.ErrNotLeader { + if err := h.Broker.SetTopicMaxIndex(topicID, index, *u); err == raft.ErrNotLeader { h.redirectToLeader(w, r) return } else if err != nil { diff --git a/messaging/handler_test.go b/messaging/handler_test.go index 198be0dc9c..74b60f94db 100644 --- a/messaging/handler_test.go +++ b/messaging/handler_test.go @@ -161,7 +161,7 @@ func TestHandler_messages_ErrMethodNotAllowed(t *testing.T) { // Ensure a handler can receive a heartbeats. func TestHandler_postHeartbeat(t *testing.T) { var hb HandlerBroker - hb.SetTopicMaxIndexFunc = func(topicID, index uint64) error { + hb.SetTopicMaxIndexFunc = func(topicID, index uint64, dataURL url.URL) error { if topicID != 1 { t.Fatalf("unexpected topic id: %d", topicID) } else if index != 2 { @@ -274,7 +274,7 @@ type HandlerBroker struct { LeaderURLFunc func() url.URL PublishFunc func(m *messaging.Message) (uint64, error) TopicReaderFunc func(topicID, index uint64, streaming bool) io.ReadCloser - SetTopicMaxIndexFunc func(topicID, index uint64) error + SetTopicMaxIndexFunc func(topicID, index uint64, dataURL url.URL) error } func (b *HandlerBroker) URLs() []url.URL { return b.URLsFunc() } @@ -284,8 +284,8 @@ func (b *HandlerBroker) Publish(m *messaging.Message) (uint64, error) { return b func (b *HandlerBroker) TopicReader(topicID, index uint64, streaming bool) io.ReadCloser { return b.TopicReaderFunc(topicID, index, streaming) } -func (b *HandlerBroker) SetTopicMaxIndex(topicID, index uint64) error { - return b.SetTopicMaxIndexFunc(topicID, index) +func (b *HandlerBroker) SetTopicMaxIndex(topicID, index uint64, dataURL url.URL) error { + return b.SetTopicMaxIndexFunc(topicID, index, dataURL) } // MustParseURL parses a string into a URL. Panic on error. diff --git a/server.go b/server.go index e3ac76eb64..d285412f0c 100644 --- a/server.go +++ b/server.go @@ -3368,8 +3368,8 @@ type messagingClient struct { } // NewMessagingClient returns an instance of MessagingClient. -func NewMessagingClient() MessagingClient { - return &messagingClient{messaging.NewClient()} +func NewMessagingClient(dataURL url.URL) MessagingClient { + return &messagingClient{messaging.NewClient(dataURL)} } func (c *messagingClient) Conn(topicID uint64) MessagingConn { return c.Client.Conn(topicID) } @@ -3849,3 +3849,12 @@ func (s *Server) CreateSnapshotWriter() (*SnapshotWriter, error) { defer s.mu.RUnlock() return createServerSnapshotWriter(s) } + +func (s *Server) URL() *url.URL { + s.mu.RLock() + defer s.mu.RUnlock() + if n := s.dataNodes[s.id]; n != nil { + return n.URL + } + return &url.URL{} +} diff --git a/server_test.go b/server_test.go index 033de36f39..28b8c29bdd 100644 --- a/server_test.go +++ b/server_test.go @@ -21,7 +21,7 @@ import ( // Ensure the server can be successfully opened and closed. func TestServer_Open(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := NewServer() defer s.Close() @@ -41,7 +41,7 @@ func TestServer_Open_ErrPathRequired(t *testing.T) { t.Skip("pending") } // Ensure the server can create a new data node. func TestServer_CreateDataNode(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -65,7 +65,7 @@ func TestServer_CreateDataNode(t *testing.T) { // Ensure the server returns an error when creating a duplicate node. func TestServer_CreateDatabase_ErrDataNodeExists(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -82,7 +82,7 @@ func TestServer_CreateDatabase_ErrDataNodeExists(t *testing.T) { // Ensure the server can delete a node. func TestServer_DeleteDataNode(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -107,7 +107,7 @@ func TestServer_DeleteDataNode(t *testing.T) { // Test unuathorized requests logging func TestServer_UnauthorizedRequests(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -137,7 +137,7 @@ func TestServer_UnauthorizedRequests(t *testing.T) { // Test user privilege authorization. func TestServer_UserPrivilegeAuthorization(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -174,7 +174,7 @@ func TestServer_UserPrivilegeAuthorization(t *testing.T) { // Test single statement query authorization. func TestServer_SingleStatementQueryAuthorization(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -243,7 +243,7 @@ func TestServer_SingleStatementQueryAuthorization(t *testing.T) { // Test multiple statement query authorization. func TestServer_MultiStatementQueryAuthorization(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -290,7 +290,7 @@ func TestServer_MultiStatementQueryAuthorization(t *testing.T) { // Ensure the server can create a database. func TestServer_CreateDatabase(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -314,7 +314,7 @@ func TestServer_CreateDatabase(t *testing.T) { // Ensure the server returns an error when creating a duplicate database. func TestServer_CreateDatabase_ErrDatabaseExists(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -330,7 +330,7 @@ func TestServer_CreateDatabase_ErrDatabaseExists(t *testing.T) { // Ensure the server can drop a database. func TestServer_DropDatabase(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -358,7 +358,7 @@ func TestServer_DropDatabase(t *testing.T) { // Ensure the server returns an error when dropping a database that doesn't exist. func TestServer_DropDatabase_ErrDatabaseNotFound(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -371,7 +371,7 @@ func TestServer_DropDatabase_ErrDatabaseNotFound(t *testing.T) { // Ensure the server can return a list of all databases. func TestServer_Databases(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -393,7 +393,7 @@ func TestServer_Databases(t *testing.T) { // Ensure the server can create a new user. func TestServer_CreateUser(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -431,7 +431,7 @@ func TestServer_CreateUser(t *testing.T) { // Ensure the server correctly detects when there is an admin user. func TestServer_AdminUserExists(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -462,7 +462,7 @@ func TestServer_AdminUserExists(t *testing.T) { // Ensure the server returns an error when creating an user without a name. func TestServer_CreateUser_ErrUsernameRequired(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -473,7 +473,7 @@ func TestServer_CreateUser_ErrUsernameRequired(t *testing.T) { // Ensure the server returns an error when creating a duplicate user. func TestServer_CreateUser_ErrUserExists(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -487,7 +487,7 @@ func TestServer_CreateUser_ErrUserExists(t *testing.T) { // Ensure the server can delete an existing user. func TestServer_DeleteUser(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -514,7 +514,7 @@ func TestServer_DeleteUser(t *testing.T) { // Ensure the server can return a list of all users. func TestServer_Users(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -536,7 +536,7 @@ func TestServer_Users(t *testing.T) { // Ensure the server does not return non-existent users func TestServer_NonExistingUsers(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -559,7 +559,7 @@ func TestServer_NonExistingUsers(t *testing.T) { // Ensure the database can create a new retention policy. func TestServer_CreateRetentionPolicy(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -593,7 +593,7 @@ func TestServer_CreateRetentionPolicy(t *testing.T) { // Ensure the database can create a new retention policy with infinite duration. func TestServer_CreateRetentionPolicyInfinite(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -628,7 +628,7 @@ func TestServer_CreateRetentionPolicyInfinite(t *testing.T) { // Ensure the database can creates a default retention policy. func TestServer_CreateRetentionPolicyDefault(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -662,7 +662,7 @@ func TestServer_CreateRetentionPolicyDefault(t *testing.T) { // Ensure the server returns an error when creating a retention policy with an invalid db. func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -673,7 +673,7 @@ func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { // Ensure the server returns an error when creating a retention policy without a name. func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -685,7 +685,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing. // Ensure the server returns an error when creating a duplicate retention policy. func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -698,7 +698,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) { // Ensure the server returns an error when creating a retention policy with a duration less than one hour. func TestServer_CreateRetentionPolicy_ErrRetentionPolicyMinDuration(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -710,7 +710,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyMinDuration(t *testing.T // Ensure the database can alter an existing retention policy. func TestServer_AlterRetentionPolicy(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -795,7 +795,7 @@ func TestServer_AlterRetentionPolicy(t *testing.T) { // Ensure the server an error is returned if trying to alter a retention policy with a duration too small. func TestServer_AlterRetentionPolicy_Minduration(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -850,7 +850,7 @@ func TestServer_AlterRetentionPolicy_Minduration(t *testing.T) { // Ensure the server can delete an existing retention policy. func TestServer_DeleteRetentionPolicy(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -878,7 +878,7 @@ func TestServer_DeleteRetentionPolicy(t *testing.T) { // Ensure the server returns an error when deleting a retention policy on invalid db. func TestServer_DeleteRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -889,7 +889,7 @@ func TestServer_DeleteRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { // Ensure the server returns an error when deleting a retention policy without a name. func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -901,7 +901,7 @@ func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing. // Ensure the server returns an error when deleting a non-existent retention policy. func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -913,7 +913,7 @@ func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) { // Ensure the server can set the default retention policy func TestServer_SetDefaultRetentionPolicy(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -948,7 +948,7 @@ func TestServer_SetDefaultRetentionPolicy(t *testing.T) { // Ensure the server returns an error when setting the default retention policy to a non-existant one. func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -960,7 +960,7 @@ func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing. // Ensure the server pre-creates shard groups as needed. func TestServer_PreCreateRetentionPolices(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -997,7 +997,7 @@ func TestServer_PreCreateRetentionPolices(t *testing.T) { // Ensure the server prohibits a zero check interval for retention policy enforcement. func TestServer_StartRetentionPolicyEnforcement_ErrZeroInterval(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1007,7 +1007,7 @@ func TestServer_StartRetentionPolicyEnforcement_ErrZeroInterval(t *testing.T) { } func TestServer_EnforceRetentionPolices(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -1044,7 +1044,7 @@ func TestServer_EnforceRetentionPolices(t *testing.T) { // Ensure the server can drop a measurement. func TestServer_DropMeasurement(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -1107,7 +1107,7 @@ func TestServer_DropMeasurement(t *testing.T) { // Ensure the server can handles drop measurement if none exists. func TestServer_DropMeasurementNoneExists(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -1138,7 +1138,7 @@ func TestServer_DropMeasurementNoneExists(t *testing.T) { // Ensure the server can drop a series. func TestServer_DropSeries(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -1182,7 +1182,7 @@ func TestServer_DropSeries(t *testing.T) { // Ensure the server can drop a series from measurement when more than one shard exists. func TestServer_DropSeriesFromMeasurement(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -1228,7 +1228,7 @@ func TestServer_DropSeriesFromMeasurement(t *testing.T) { // ensure that the dropped series is gone // ensure that we can still query: select value from cpu where region=uswest func TestServer_DropSeriesTagsPreserved(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -1302,7 +1302,7 @@ func TestServer_DropSeriesTagsPreserved(t *testing.T) { // Ensure the server respects limit and offset in show series queries func TestServer_ShowSeriesLimitOffset(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1365,7 +1365,7 @@ func TestServer_ShowSeriesLimitOffset(t *testing.T) { } func TestServer_CreateShardGroupIfNotExist(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1390,7 +1390,7 @@ func TestServer_CreateShardGroupIfNotExist(t *testing.T) { } func TestServer_DeleteShardGroup(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1428,7 +1428,7 @@ func TestServer_DeleteShardGroup(t *testing.T) { /* TODO(benbjohnson): Change test to not expose underlying series ids directly. func TestServer_Measurements(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1499,7 +1499,7 @@ func TestServer_NormalizeMeasurement(t *testing.T) { } // Create server with a variety of databases, retention policies, and measurements - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1550,7 +1550,7 @@ func TestServer_NormalizeQuery(t *testing.T) { } // Start server with database & retention policy. - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1572,7 +1572,7 @@ func TestServer_NormalizeQuery(t *testing.T) { // Ensure the server can create a continuous query func TestServer_CreateContinuousQuery(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1632,7 +1632,7 @@ func TestServer_CreateContinuousQuery_ErrInfinteLoop(t *testing.T) { } func TestServer_DropContinuousQuery(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1691,7 +1691,7 @@ func TestServer_DropContinuousQuery(t *testing.T) { // Ensure func TestServer_RunContinuousQueries(t *testing.T) { t.Skip() - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1771,7 +1771,7 @@ func TestServer_RunContinuousQueries(t *testing.T) { // Ensure the server can create a snapshot writer. func TestServer_CreateSnapshotWriter(t *testing.T) { - c := test.NewMessagingClient() + c := test.NewDefaultMessagingClient() s := OpenServer(c) defer s.Close() diff --git a/test/messaging.go b/test/messaging.go index b8123edb38..9de8bf2458 100644 --- a/test/messaging.go +++ b/test/messaging.go @@ -9,11 +9,17 @@ import ( "github.com/influxdb/influxdb/messaging" ) +func init() { + // Ensure the broker matches the handler's interface. + _ = messaging.Handler{Broker: messaging.NewBroker()} +} + // MessagingClient represents a test client for the messaging broker. type MessagingClient struct { - mu sync.Mutex - index uint64 // highest index - conns []*MessagingConn // list of all connections + mu sync.Mutex + index uint64 // highest index + conns []*MessagingConn // list of all connections + dataURL url.URL // clients data node URL messagesByTopicID map[uint64][]*messaging.Message // message by topic @@ -22,15 +28,22 @@ type MessagingClient struct { } // NewMessagingClient returns a new instance of MessagingClient. -func NewMessagingClient() *MessagingClient { +func NewMessagingClient(dataURL url.URL) *MessagingClient { c := &MessagingClient{ messagesByTopicID: make(map[uint64][]*messaging.Message), + dataURL: dataURL, } c.PublishFunc = c.DefaultPublishFunc c.ConnFunc = c.DefaultConnFunc return c } +// NewMessagingClient returns a new instance of MessagingClient. +func NewDefaultMessagingClient() *MessagingClient { + testDataURL, _ := url.Parse("http://localhost:1234/data") + return NewMessagingClient(*testDataURL) +} + func (c *MessagingClient) Open(path string) error { return nil } // Close closes all open connections. @@ -82,7 +95,7 @@ func (c *MessagingClient) DefaultConnFunc(topicID uint64) influxdb.MessagingConn defer c.mu.Unlock() // Create new connection. - conn := NewMessagingConn(topicID) + conn := NewMessagingConn(topicID, c.dataURL) // Track connections. c.conns = append(c.conns, conn) @@ -111,13 +124,15 @@ type MessagingConn struct { mu sync.Mutex topicID uint64 index uint64 + dataURL url.URL c chan *messaging.Message } // NewMessagingConn returns a new instance of MessagingConn. -func NewMessagingConn(topicID uint64) *MessagingConn { +func NewMessagingConn(topicID uint64, dataURL url.URL) *MessagingConn { return &MessagingConn{ topicID: topicID, + dataURL: dataURL, } }