diff --git a/broker.go b/broker.go index 1060687e61..2c5fb3cf61 100644 --- a/broker.go +++ b/broker.go @@ -1,10 +1,6 @@ package influxdb -/* import ( - "fmt" - "log" - "net/http" "time" "github.com/influxdb/influxdb/messaging" @@ -31,7 +27,7 @@ type Broker struct { done chan struct{} // send CQ processing requests to the same data node - currentCQProcessingNode *messaging.Replica + // currentCQProcessingNode *messaging.Replica // FIX(benbjohnson) // variables to control when to trigger processing and when to timeout TriggerInterval time.Duration @@ -51,10 +47,14 @@ func NewBroker() *Broker { // RunContinuousQueryLoop starts running continuous queries on a background goroutine. func (b *Broker) RunContinuousQueryLoop() { - b.done = make(chan struct{}) - go b.continuousQueryLoop(b.done) + // FIX(benbjohnson) + // b.done = make(chan struct{}) + // go b.continuousQueryLoop(b.done) } +/* + + // Close closes the broker. func (b *Broker) Close() error { if b.done != nil { diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 7485dda029..4452654e98 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -180,8 +180,8 @@ func (c *Config) DataAddrUDP() string { } // DataURL returns the URL required to contact the data server. -func (c *Config) DataURL() *url.URL { - return &url.URL{ +func (c *Config) DataURL() url.URL { + return url.URL{ Scheme: "http", Host: net.JoinHostPort(c.Hostname, strconv.Itoa(c.Data.Port)), } @@ -193,8 +193,8 @@ func (c *Config) BrokerAddr() string { } // BrokerURL returns the URL required to contact the Broker server. -func (c *Config) BrokerURL() *url.URL { - return &url.URL{ +func (c *Config) BrokerURL() url.URL { + return url.URL{ Scheme: "http", Host: net.JoinHostPort(c.Hostname, strconv.Itoa(c.Broker.Port)), } diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 712177c20d..38671ce3e6 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -20,6 +20,7 @@ import ( "github.com/influxdb/influxdb/graphite" "github.com/influxdb/influxdb/httpd" "github.com/influxdb/influxdb/messaging" + "github.com/influxdb/influxdb/raft" "github.com/influxdb/influxdb/udp" ) @@ -34,20 +35,26 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B initializing := !fileExists(config.BrokerDir()) && !fileExists(config.DataDir()) // Parse join urls from the --join flag. - var joinURLs []*url.URL + var joinURLs []url.URL if join == "" { joinURLs = parseURLs(config.JoinURLs()) } else { joinURLs = parseURLs(join) } - // Open broker, initialize or join as necessary. - b := openBroker(config.BrokerDir(), config.BrokerURL(), initializing, joinURLs, logWriter) + // Open broker & raft log, initialize or join as necessary. + b, l := openBroker(config.BrokerDir(), config.BrokerURL(), initializing, joinURLs, logWriter) // Start the broker handler. var h *Handler if b != nil { - h = &Handler{brokerHandler: messaging.NewHandler(b.Broker)} + h = &Handler{ + brokerHandler: &messaging.Handler{ + Broker: b.Broker, + RaftHandler: &raft.Handler{Log: l}, + }, + } + // We want to make sure we are spun up before we exit this function, so we manually listen and serve listener, err := net.Listen("tcp", config.BrokerAddr()) if err != nil { @@ -158,7 +165,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B // unless disabled, start the loop to report anonymous usage stats every 24h if !config.ReportingDisabled { - clusterID := b.Broker.Log().Config().ClusterID + clusterID := b.Broker.ClusterID() go s.StartReportingLoop(version, clusterID) } @@ -206,57 +213,67 @@ func parseConfig(path, hostname string) *Config { } // creates and initializes a broker. -func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL, w io.Writer) *influxdb.Broker { +func openBroker(path string, u url.URL, initializing bool, joinURLs []url.URL, w io.Writer) (*influxdb.Broker, *raft.Log) { // Ignore if there's no existing broker and we're not initializing or joining. if !fileExists(path) && !initializing && len(joinURLs) == 0 { - return nil + return nil, nil } + // Create raft log. + l := raft.NewLog() + l.SetURL(u) + l.SetLogOutput(w) + // Create broker. b := influxdb.NewBroker() + b.Log = l b.SetLogOutput(w) - if err := b.Open(path, u); err != nil { + // Open broker so it can feed last index data to the log. + if err := b.Open(path); err != nil { log.Fatalf("failed to open broker: %s", err) } - // If this is a new broker then we can initialie two ways: + // Attach the broker as the finite state machine of the raft log. + l.FSM = &messaging.RaftFSM{Broker: b} + + // Open raft log inside broker directory. + if err := l.Open(filepath.Join(path, "raft")); err != nil { + log.Fatalf("raft: %s", err) + } + + // If this is a new raft log then we can initialie two ways: // 1) Start a brand new cluster. // 2) Join an existing cluster. if initializing { if len(joinURLs) == 0 { - initializeBroker(b) + if err := l.Initialize(); err != nil { + log.Fatalf("initialize raft log: %s", err) + } } else { - joinBroker(b, joinURLs) + joinLog(l, joinURLs) } } - return b + return b, l } -// initializes a new broker. -func initializeBroker(b *influxdb.Broker) { - if err := b.Initialize(); err != nil { - log.Fatalf("initialize: %s", err) - } -} - -// joins a broker to an existing cluster. -func joinBroker(b *influxdb.Broker, joinURLs []*url.URL) { +// joins a raft log to an existing cluster. +func joinLog(l *raft.Log, joinURLs []url.URL) { // Attempts to join each server until successful. for _, u := range joinURLs { - if err := b.Join(u); err != nil { - log.Printf("join: failed to connect to broker: %s: %s", u, err) + if err := l.Join(u); err != nil { + log.Printf("join: failed to connect to raft cluster: %s: %s", u, err) } else { - log.Printf("join: connected broker to %s", u) + log.Printf("join: connected raft log to %s", u) return } } - log.Fatalf("join: failed to connect broker to any specified server") + log.Fatalf("join: failed to connect raft log to any specified server") } // creates and initializes a server. -func openServer(config *Config, b *influxdb.Broker, initializing, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server { +func openServer(config *Config, b *influxdb.Broker, initializing, configExists bool, joinURLs []url.URL, w io.Writer) *influxdb.Server { // Ignore if there's no existing server and we're not initializing or joining. if !fileExists(config.Data.Dir) && !initializing && len(joinURLs) == 0 { return nil @@ -286,13 +303,13 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b } else if !configExists { // We are spining up a server that has no config, // but already has an initialized data directory - joinURLs = []*url.URL{b.URL()} + joinURLs = []url.URL{b.URL()} openServerClient(s, joinURLs, w) } else { if len(joinURLs) == 0 { // If a config exists, but no joinUrls are specified, fall back to the broker URL // TODO: Make sure we have a leader, and then spin up the server - joinURLs = []*url.URL{b.URL()} + joinURLs = []url.URL{b.URL()} } openServerClient(s, joinURLs, w) } @@ -301,18 +318,13 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b } // initializes a new server that does not yet have an ID. -func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer) { +func initializeServer(u url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer) { // TODO: Create replica using the messaging client. - // Create replica on broker. - if err := b.CreateReplica(1, u); err != nil { - log.Fatalf("replica creation error: %s", err) - } - // Create messaging client. - c := messaging.NewClient(1) + c := influxdb.NewMessagingClient() c.SetLogOutput(w) - if err := c.Open(filepath.Join(s.Path(), messagingClientFile), []*url.URL{b.URL()}); err != nil { + if err := c.Open(filepath.Join(s.Path(), messagingClientFile), []url.URL{b.URL()}); err != nil { log.Fatalf("messaging client error: %s", err) } if err := s.SetClient(c); err != nil { @@ -326,12 +338,12 @@ func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.W } // joins a server to an existing cluster. -func joinServer(s *influxdb.Server, u *url.URL, joinURLs []*url.URL) { +func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) { // TODO: Use separate broker and data join urls. // Create data node on an existing data node. for _, joinURL := range joinURLs { - if err := s.Join(u, joinURL); err != nil { + if err := s.Join(&u, &joinURL); err != nil { log.Printf("join: failed to connect data node: %s: %s", u, err) } else { log.Printf("join: connected data node to %s", u) @@ -342,8 +354,8 @@ func joinServer(s *influxdb.Server, u *url.URL, joinURLs []*url.URL) { } // opens the messaging client and attaches it to the server. -func openServerClient(s *influxdb.Server, joinURLs []*url.URL, w io.Writer) { - c := messaging.NewClient(s.ID()) +func openServerClient(s *influxdb.Server, joinURLs []url.URL, w io.Writer) { + c := influxdb.NewMessagingClient() c.SetLogOutput(w) if err := c.Open(filepath.Join(s.Path(), messagingClientFile), joinURLs); err != nil { log.Fatalf("messaging client error: %s", err) @@ -354,7 +366,7 @@ func openServerClient(s *influxdb.Server, joinURLs []*url.URL, w io.Writer) { } // parses a comma-delimited list of URLs. -func parseURLs(s string) (a []*url.URL) { +func parseURLs(s string) (a []url.URL) { if s == "" { return nil } @@ -364,7 +376,7 @@ func parseURLs(s string) (a []*url.URL) { if err != nil { log.Fatalf("cannot parse urls: %s", err) } - a = append(a, u) + a = append(a, *u) } return } diff --git a/httpd/handler.go b/httpd/handler.go index 2cb601e2bb..842681a281 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -383,12 +383,6 @@ func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request) { // Retrieve data node reference. node := h.server.DataNodeByURL(u) - // Create a new replica on the broker. - if err := h.server.Client().CreateReplica(node.ID, node.URL); err != nil { - httpError(w, err.Error(), false, http.StatusBadGateway) - return - } - // Write new node back to client. w.WriteHeader(http.StatusCreated) w.Header().Add("content-type", "application/json") diff --git a/httpd/handler_test.go b/httpd/handler_test.go index f975650913..8911fac180 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -11,14 +11,13 @@ import ( "net/url" "os" "strings" - "sync" "testing" "time" "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/httpd" "github.com/influxdb/influxdb/influxql" - "github.com/influxdb/influxdb/messaging" + "github.com/influxdb/influxdb/test" ) func init() { @@ -137,7 +136,9 @@ func TestBatchWrite_UnmarshalRFC(t *testing.T) { } func TestHandler_Databases(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") srvr.CreateDatabase("bar") s := NewHTTPServer(srvr) @@ -152,7 +153,9 @@ func TestHandler_Databases(t *testing.T) { } func TestHandler_DatabasesPrettyPrinted(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") srvr.CreateDatabase("bar") s := NewHTTPServer(srvr) @@ -187,7 +190,9 @@ func TestHandler_DatabasesPrettyPrinted(t *testing.T) { } func TestHandler_CreateDatabase(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -200,7 +205,9 @@ func TestHandler_CreateDatabase(t *testing.T) { } func TestHandler_CreateDatabase_BadRequest_NoName(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -211,7 +218,9 @@ func TestHandler_CreateDatabase_BadRequest_NoName(t *testing.T) { } func TestHandler_CreateDatabase_Conflict(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") s := NewHTTPServer(srvr) defer s.Close() @@ -225,7 +234,9 @@ func TestHandler_CreateDatabase_Conflict(t *testing.T) { } func TestHandler_DropDatabase(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") s := NewHTTPServer(srvr) defer s.Close() @@ -239,7 +250,9 @@ func TestHandler_DropDatabase(t *testing.T) { } func TestHandler_DropDatabase_NotFound(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -252,7 +265,9 @@ func TestHandler_DropDatabase_NotFound(t *testing.T) { } func TestHandler_RetentionPolicies(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) s := NewHTTPServer(srvr) @@ -268,7 +283,9 @@ func TestHandler_RetentionPolicies(t *testing.T) { } func TestHandler_RetentionPolicies_DatabaseNotFound(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -282,7 +299,9 @@ func TestHandler_RetentionPolicies_DatabaseNotFound(t *testing.T) { } func TestHandler_CreateRetentionPolicy(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") s := NewHTTPServer(srvr) defer s.Close() @@ -298,7 +317,9 @@ func TestHandler_CreateRetentionPolicy(t *testing.T) { } func TestHandler_CreateRetentionPolicyAsDefault(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") s := NewHTTPServer(srvr) defer s.Close() @@ -321,7 +342,9 @@ func TestHandler_CreateRetentionPolicyAsDefault(t *testing.T) { } func TestHandler_CreateRetentionPolicy_DatabaseNotFound(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -334,7 +357,9 @@ func TestHandler_CreateRetentionPolicy_DatabaseNotFound(t *testing.T) { } func TestHandler_CreateRetentionPolicy_Conflict(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") s := NewHTTPServer(srvr) defer s.Close() @@ -350,7 +375,9 @@ func TestHandler_CreateRetentionPolicy_Conflict(t *testing.T) { } func TestHandler_CreateRetentionPolicy_BadRequest(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") s := NewHTTPServer(srvr) defer s.Close() @@ -364,7 +391,9 @@ func TestHandler_CreateRetentionPolicy_BadRequest(t *testing.T) { } func TestHandler_UpdateRetentionPolicy(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) s := NewHTTPServer(srvr) @@ -394,7 +423,9 @@ func TestHandler_UpdateRetentionPolicy(t *testing.T) { } func TestHandler_UpdateRetentionPolicy_BadRequest(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) s := NewHTTPServer(srvr) @@ -410,7 +441,9 @@ func TestHandler_UpdateRetentionPolicy_BadRequest(t *testing.T) { } func TestHandler_UpdateRetentionPolicy_DatabaseNotFound(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -424,7 +457,9 @@ func TestHandler_UpdateRetentionPolicy_DatabaseNotFound(t *testing.T) { } func TestHandler_UpdateRetentionPolicy_NotFound(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) s := NewHTTPServer(srvr) @@ -440,7 +475,9 @@ func TestHandler_UpdateRetentionPolicy_NotFound(t *testing.T) { } func TestHandler_DeleteRetentionPolicy(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) s := NewHTTPServer(srvr) @@ -457,7 +494,9 @@ func TestHandler_DeleteRetentionPolicy(t *testing.T) { } func TestHandler_DeleteRetentionPolicy_DatabaseNotFound(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -472,7 +511,9 @@ func TestHandler_DeleteRetentionPolicy_DatabaseNotFound(t *testing.T) { } func TestHandler_DeleteRetentionPolicy_NotFound(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") s := NewHTTPServer(srvr) defer s.Close() @@ -488,7 +529,9 @@ func TestHandler_DeleteRetentionPolicy_NotFound(t *testing.T) { } func TestHandler_GzipEnabled(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -512,7 +555,9 @@ func TestHandler_GzipEnabled(t *testing.T) { } func TestHandler_GzipDisabled(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -536,7 +581,9 @@ func TestHandler_GzipDisabled(t *testing.T) { } func TestHandler_Index(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -552,7 +599,9 @@ func TestHandler_Index(t *testing.T) { } func TestHandler_Wait(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -568,7 +617,9 @@ func TestHandler_Wait(t *testing.T) { } func TestHandler_WaitIncrement(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) @@ -586,7 +637,9 @@ func TestHandler_WaitIncrement(t *testing.T) { } func TestHandler_WaitNoIndexSpecified(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -598,7 +651,9 @@ func TestHandler_WaitNoIndexSpecified(t *testing.T) { } func TestHandler_WaitInvalidIndexSpecified(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -610,7 +665,9 @@ func TestHandler_WaitInvalidIndexSpecified(t *testing.T) { } func TestHandler_WaitExpectTimeout(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -622,7 +679,9 @@ func TestHandler_WaitExpectTimeout(t *testing.T) { } func TestHandler_Ping(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -634,7 +693,9 @@ func TestHandler_Ping(t *testing.T) { } func TestHandler_PingHead(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -646,7 +707,9 @@ func TestHandler_PingHead(t *testing.T) { } func TestHandler_Users_MultipleUsers(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateUser("jdoe", "1337", false) srvr.CreateUser("mclark", "1337", true) srvr.CreateUser("csmith", "1337", false) @@ -664,7 +727,9 @@ func TestHandler_Users_MultipleUsers(t *testing.T) { func TestHandler_UpdateUser(t *testing.T) { t.Skip() - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateUser("jdoe", "1337", false) s := NewHTTPServer(srvr) defer s.Close() @@ -685,7 +750,9 @@ func TestHandler_UpdateUser(t *testing.T) { func TestHandler_UpdateUser_PasswordBadRequest(t *testing.T) { t.Skip() - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateUser("jdoe", "1337", false) s := NewHTTPServer(srvr) defer s.Close() @@ -700,7 +767,9 @@ func TestHandler_UpdateUser_PasswordBadRequest(t *testing.T) { func TestHandler_DataNodes(t *testing.T) { t.Skip() - srvr := OpenUninitializedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenUninitializedServer(c) srvr.CreateDataNode(MustParseURL("http://localhost:1000")) srvr.CreateDataNode(MustParseURL("http://localhost:2000")) srvr.CreateDataNode(MustParseURL("http://localhost:3000")) @@ -717,7 +786,9 @@ func TestHandler_DataNodes(t *testing.T) { func TestHandler_CreateDataNode(t *testing.T) { t.Skip() - srvr := OpenUninitializedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenUninitializedServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -731,7 +802,9 @@ func TestHandler_CreateDataNode(t *testing.T) { func TestHandler_CreateDataNode_BadRequest(t *testing.T) { t.Skip() - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -745,7 +818,9 @@ func TestHandler_CreateDataNode_BadRequest(t *testing.T) { func TestHandler_CreateDataNode_InternalServerError(t *testing.T) { t.Skip() - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -759,7 +834,9 @@ func TestHandler_CreateDataNode_InternalServerError(t *testing.T) { func TestHandler_DeleteDataNode(t *testing.T) { t.Skip() - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDataNode(MustParseURL("http://localhost:1000")) s := NewHTTPServer(srvr) defer s.Close() @@ -774,7 +851,9 @@ func TestHandler_DeleteDataNode(t *testing.T) { func TestHandler_DeleteUser_DataNodeNotFound(t *testing.T) { t.Skip() - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -789,7 +868,9 @@ func TestHandler_DeleteUser_DataNodeNotFound(t *testing.T) { // Perform a subset of endpoint testing, with authentication enabled. func TestHandler_AuthenticatedCreateAdminUser(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) s := NewAuthenticatedHTTPServer(srvr) defer s.Close() @@ -810,7 +891,9 @@ func TestHandler_AuthenticatedCreateAdminUser(t *testing.T) { } func TestHandler_AuthenticatedDatabases_Unauthorized(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) s := NewAuthenticatedHTTPServer(srvr) defer s.Close() @@ -821,7 +904,9 @@ func TestHandler_AuthenticatedDatabases_Unauthorized(t *testing.T) { } func TestHandler_AuthenticatedDatabases_AuthorizedQueryParams(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) srvr.CreateUser("lisa", "password", true) s := NewAuthenticatedHTTPServer(srvr) defer s.Close() @@ -834,7 +919,9 @@ func TestHandler_AuthenticatedDatabases_AuthorizedQueryParams(t *testing.T) { } func TestHandler_AuthenticatedDatabases_UnauthorizedQueryParams(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) srvr.CreateUser("lisa", "password", true) s := NewAuthenticatedHTTPServer(srvr) defer s.Close() @@ -847,7 +934,9 @@ func TestHandler_AuthenticatedDatabases_UnauthorizedQueryParams(t *testing.T) { } func TestHandler_AuthenticatedDatabases_AuthorizedBasicAuth(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) srvr.CreateUser("lisa", "password", true) s := NewAuthenticatedHTTPServer(srvr) defer s.Close() @@ -862,7 +951,9 @@ func TestHandler_AuthenticatedDatabases_AuthorizedBasicAuth(t *testing.T) { } func TestHandler_AuthenticatedDatabases_UnauthorizedBasicAuth(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) srvr.CreateUser("lisa", "password", true) s := NewAuthenticatedHTTPServer(srvr) defer s.Close() @@ -877,7 +968,9 @@ func TestHandler_AuthenticatedDatabases_UnauthorizedBasicAuth(t *testing.T) { } func TestHandler_GrantDBPrivilege(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) // Create a cluster admin that will grant privilege to "john". srvr.CreateUser("lisa", "password", true) // Create user that will be granted a privilege. @@ -914,7 +1007,9 @@ func TestHandler_GrantDBPrivilege(t *testing.T) { } func TestHandler_RevokeAdmin(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) // Create a cluster admin that will revoke admin from "john". srvr.CreateUser("lisa", "password", true) // Create user that will have cluster admin revoked. @@ -946,7 +1041,9 @@ func TestHandler_RevokeAdmin(t *testing.T) { } func TestHandler_RevokeDBPrivilege(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) // Create a cluster admin that will revoke privilege from "john". srvr.CreateUser("lisa", "password", true) // Create user that will have privilege revoked. @@ -980,7 +1077,9 @@ func TestHandler_RevokeDBPrivilege(t *testing.T) { } func TestHandler_DropSeries(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) s := NewHTTPServer(srvr) @@ -1001,7 +1100,9 @@ func TestHandler_DropSeries(t *testing.T) { } func TestHandler_serveWriteSeries(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) srvr.CreateDatabase("foo") srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) s := NewHTTPServer(srvr) @@ -1015,7 +1116,9 @@ func TestHandler_serveWriteSeries(t *testing.T) { } func TestHandler_serveWriteSeriesWithNoFields(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) srvr.CreateDatabase("foo") srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) s := NewHTTPServer(srvr) @@ -1033,7 +1136,9 @@ func TestHandler_serveWriteSeriesWithNoFields(t *testing.T) { } func TestHandler_serveWriteSeriesWithAuthNilUser(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) srvr.CreateDatabase("foo") srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) s := NewAuthenticatedHTTPServer(srvr) @@ -1052,7 +1157,9 @@ func TestHandler_serveWriteSeriesWithAuthNilUser(t *testing.T) { } func TestHandler_serveWriteSeries_noDatabaseExists(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -1070,7 +1177,9 @@ func TestHandler_serveWriteSeries_noDatabaseExists(t *testing.T) { } func TestHandler_serveWriteSeries_invalidJSON(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -1087,7 +1196,9 @@ func TestHandler_serveWriteSeries_invalidJSON(t *testing.T) { } func TestHandler_serveWriteSeries_noDatabaseSpecified(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) s := NewHTTPServer(srvr) defer s.Close() @@ -1104,7 +1215,9 @@ func TestHandler_serveWriteSeries_noDatabaseSpecified(t *testing.T) { } func TestHandler_serveWriteSeriesNonZeroTime(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) srvr.SetDefaultRetentionPolicy("foo", "bar") @@ -1145,7 +1258,9 @@ func TestHandler_serveWriteSeriesNonZeroTime(t *testing.T) { } func TestHandler_serveWriteSeriesZeroTime(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) srvr.SetDefaultRetentionPolicy("foo", "bar") @@ -1198,7 +1313,9 @@ func TestHandler_serveWriteSeriesZeroTime(t *testing.T) { } func TestHandler_serveWriteSeriesBatch(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) srvr.SetDefaultRetentionPolicy("foo", "bar") @@ -1281,7 +1398,9 @@ func TestHandler_serveWriteSeriesBatch(t *testing.T) { } func TestHandler_serveWriteSeriesFieldTypeConflict(t *testing.T) { - srvr := OpenAuthlessServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) srvr.SetDefaultRetentionPolicy("foo", "bar") @@ -1322,7 +1441,9 @@ func str2iface(strs []string) []interface{} { } func TestHandler_ProcessContinousQueries(t *testing.T) { - srvr := OpenAuthenticatedServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthenticatedServer(c) s := NewAuthenticatedHTTPServer(srvr) defer s.Close() @@ -1408,7 +1529,7 @@ func NewServer() *Server { // OpenAuthenticatedServer returns a new, open test server instance with authentication enabled. func OpenAuthenticatedServer(client influxdb.MessagingClient) *Server { s := OpenUninitializedServer(client) - if err := s.Initialize(&url.URL{Host: "127.0.0.1:8080"}); err != nil { + if err := s.Initialize(url.URL{Host: "127.0.0.1:8080"}); err != nil { panic(err.Error()) } return s @@ -1457,72 +1578,6 @@ func OpenUninitializedServer(client influxdb.MessagingClient) *Server { return s } -// TODO corylanou: evaluate how much of this should be in this package -// vs. how much should be a mocked out interface -// MessagingClient represents a test client for the messaging broker. -type MessagingClient struct { - index uint64 - c chan *messaging.Message - mu sync.Mutex // Ensure all publishing is serialized. - - PublishFunc func(*messaging.Message) (uint64, error) - CreateReplicaFunc func(replicaID uint64, connectURL *url.URL) error - DeleteReplicaFunc func(replicaID uint64) error - SubscribeFunc func(replicaID, topicID uint64) error - UnsubscribeFunc func(replicaID, topicID uint64) error -} - -// NewMessagingClient returns a new instance of MessagingClient. -func NewMessagingClient() *MessagingClient { - c := &MessagingClient{c: make(chan *messaging.Message, 1)} - c.PublishFunc = c.send - c.CreateReplicaFunc = func(replicaID uint64, connectURL *url.URL) error { return nil } - c.DeleteReplicaFunc = func(replicaID uint64) error { return nil } - c.SubscribeFunc = func(replicaID, topicID uint64) error { return nil } - c.UnsubscribeFunc = func(replicaID, topicID uint64) error { return nil } - return c -} - -// Publish attaches an autoincrementing index to the message. -// This function also execute's the client's PublishFunc mock function. -func (c *MessagingClient) Publish(m *messaging.Message) (uint64, error) { - c.mu.Lock() - defer c.mu.Unlock() - c.index++ - m.Index = c.index - return c.PublishFunc(m) -} - -// send sends the message through to the channel. -// This is the default value of PublishFunc. -func (c *MessagingClient) send(m *messaging.Message) (uint64, error) { - c.c <- m - return m.Index, nil -} - -// Creates a new replica with a given ID on the broker. -func (c *MessagingClient) CreateReplica(replicaID uint64, connectURL *url.URL) error { - return c.CreateReplicaFunc(replicaID, connectURL) -} - -// Deletes an existing replica with a given ID from the broker. -func (c *MessagingClient) DeleteReplica(replicaID uint64) error { - return c.DeleteReplicaFunc(replicaID) -} - -// Subscribe adds a subscription to a replica for a topic on the broker. -func (c *MessagingClient) Subscribe(replicaID, topicID uint64) error { - return c.SubscribeFunc(replicaID, topicID) -} - -// Unsubscribe removes a subscrition from a replica for a topic on the broker. -func (c *MessagingClient) Unsubscribe(replicaID, topicID uint64) error { - return c.UnsubscribeFunc(replicaID, topicID) -} - -// C returns a channel for streaming message. -func (c *MessagingClient) C() <-chan *messaging.Message { return c.c } - // tempfile returns a temporary path. func tempfile() string { f, _ := ioutil.TempFile("", "influxdb-") diff --git a/messaging/broker.go b/messaging/broker.go index 28b3b765ce..c1fc503494 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "log" + "net/url" "os" "path/filepath" "sort" @@ -34,6 +35,9 @@ type Broker struct { // Log is the distributed raft log that commands are applied to. Log interface { + URL() url.URL + Leader() (uint64, url.URL) + ClusterID() uint64 Apply(data []byte) (index uint64, err error) } @@ -61,6 +65,18 @@ func (b *Broker) metaPath() string { return filepath.Join(b.path, "meta") } +// URL returns the URL of the broker. +func (b *Broker) URL() url.URL { return b.Log.URL() } + +// LeaderURL returns the URL to the leader broker. +func (b *Broker) LeaderURL() url.URL { + _, u := b.Log.Leader() + return u +} + +// ClusterID returns the identifier for the cluster. +func (b *Broker) ClusterID() uint64 { return b.Log.ClusterID() } + // TopicPath returns the file path to a topic's data. // Returns a blank string if the broker is closed. func (b *Broker) TopicPath(id uint64) string { @@ -86,10 +102,10 @@ func (b *Broker) Topic(id uint64) *Topic { // Index returns the highest index seen by the broker across all topics. // Returns 0 if the broker is closed. -func (b *Broker) Index() uint64 { +func (b *Broker) Index() (uint64, error) { b.mu.RLock() defer b.mu.RUnlock() - return b.index + return b.index, nil } // opened returns true if the broker is in an open and running state. @@ -268,7 +284,7 @@ func (b *Broker) createSnapshotHeader() (*snapshotHeader, error) { // Read segments from disk, not topic. segments, err := ReadSegments(t.path) - if err != nil { + if err != nil && !os.IsNotExist(err) { return nil, fmt.Errorf("read segments: %s", err) } @@ -401,7 +417,7 @@ func (b *Broker) Publish(m *Message) (uint64, error) { } // TopicReader returns a new topic reader for a topic starting from a given index. -func (b *Broker) TopicReader(topicID, index uint64, streaming bool) *TopicReader { +func (b *Broker) TopicReader(topicID, index uint64, streaming bool) io.ReadCloser { return NewTopicReader(b.TopicPath(topicID), index, streaming) } @@ -617,7 +633,7 @@ func (t *Topic) Open() error { // Read available segments. segments, err := ReadSegments(t.path) - if err != nil { + if err != nil && !os.IsNotExist(err) { t.close() return fmt.Errorf("read segments: %s", err) } @@ -669,7 +685,7 @@ func (t *Topic) close() error { func (t *Topic) ReadIndex() (uint64, error) { // Read a list of all segments. segments, err := ReadSegments(t.path) - if err != nil { + if err != nil && !os.IsNotExist(err) { return 0, fmt.Errorf("read segments: %s", err) } @@ -839,7 +855,9 @@ func ReadSegments(path string) (Segments, error) { func ReadSegmentByIndex(path string, index uint64) (*Segment, error) { // Find a list of all segments. segments, err := ReadSegments(path) - if err != nil { + if os.IsNotExist(err) { + return nil, err + } else if err != nil { return nil, fmt.Errorf("read segments: %s", err) } @@ -869,7 +887,9 @@ func ReadSegmentByIndex(path string, index uint64) (*Segment, error) { func ReadSegmentMaxIndex(path string) (uint64, error) { // Open segment file. f, err := os.Open(path) - if err != nil { + if os.IsNotExist(err) { + return 0, err + } else if err != nil { return 0, fmt.Errorf("open: %s", err) } defer func() { _ = f.Close() }() @@ -966,9 +986,11 @@ func (r *TopicReader) File() (*os.File, error) { // If the first file hasn't been opened then open it and seek. if r.file == nil { // Find the segment containing the index. - // Exit if no segments are available. + // Exit if no segments are available or if path not found. segment, err := ReadSegmentByIndex(r.path, r.index) - if err != nil { + if os.IsNotExist(err) { + return nil, nil + } else if err != nil { return nil, fmt.Errorf("segment by index: %s", err) } else if segment == nil { return nil, nil @@ -1032,7 +1054,9 @@ func (r *TopicReader) nextSegment() error { // If no segments exist then exit. // If current segment is the last segment then ignore. segments, err := ReadSegments(r.path) - if err != nil { + if os.IsNotExist(err) { + return nil + } else if err != nil { return fmt.Errorf("read segments: %s", err) } else if len(segments) == 0 { return nil diff --git a/messaging/broker_test.go b/messaging/broker_test.go index 78ac8d22c5..2f7d30984a 100644 --- a/messaging/broker_test.go +++ b/messaging/broker_test.go @@ -6,6 +6,7 @@ import ( "io" "io/ioutil" "math/rand" + "net/url" "os" "path/filepath" "reflect" @@ -18,6 +19,11 @@ import ( "github.com/influxdb/influxdb/raft" ) +func init() { + // Ensure the broker matches the handler's interface. + _ = messaging.Handler{Broker: messaging.NewBroker()} +} + // Ensure that opening a broker without a path returns an error. func TestBroker_Open_ErrPathRequired(t *testing.T) { b := messaging.NewBroker() @@ -97,7 +103,7 @@ func TestBroker_Apply(t *testing.T) { } // Verify broker high water mark. - if index := b.Index(); index != 4 { + if index, _ := b.Index(); index != 4 { t.Fatalf("unexpected broker index: %d", index) } } @@ -152,7 +158,7 @@ func TestBroker_Reopen(t *testing.T) { } // Verify broker high water mark. - if index := b.Index(); index != 4 { + if index, _ := b.Index(); index != 4 { t.Fatalf("unexpected broker index: %d", index) } @@ -218,7 +224,7 @@ func TestBroker_Snapshot(t *testing.T) { } // Verify broker high water mark. - if index := b1.Index(); index != 4 { + if index, _ := b1.Index(); index != 4 { t.Fatalf("unexpected broker index: %d", index) } } @@ -710,10 +716,16 @@ func (b *Broker) MustReadAllTopic(topicID uint64) (a []*messaging.Message) { // BrokerLog is a mockable object that implements Broker.Log. type BrokerLog struct { - ApplyFunc func(data []byte) (uint64, error) + ApplyFunc func(data []byte) (uint64, error) + ClusterIDFunc func() uint64 + LeaderFunc func() (uint64, url.URL) + URLFunc func() url.URL } func (l *BrokerLog) Apply(data []byte) (uint64, error) { return l.ApplyFunc(data) } +func (l *BrokerLog) ClusterID() uint64 { return l.ClusterIDFunc() } +func (l *BrokerLog) Leader() (uint64, url.URL) { return l.LeaderFunc() } +func (l *BrokerLog) URL() url.URL { return l.URLFunc() } // Messages represents a collection of messages. // This type provides helper functions. diff --git a/messaging/client.go b/messaging/client.go index 6f90f5c735..5acd3da9bf 100644 --- a/messaging/client.go +++ b/messaging/client.go @@ -252,10 +252,11 @@ func NewClientConfig(u []url.URL) *ClientConfig { // Conn represents a stream over the client for a single topic. type Conn struct { - mu sync.Mutex - topicID uint64 // topic identifier - index uint64 // highest index sent over the channel - url url.URL // current broker url + mu sync.Mutex + topicID uint64 // topic identifier + index uint64 // highest index sent over the channel + streaming bool // use streaming reader, if true + url url.URL // current broker url opened bool c chan *Message // channel streams messages from the broker. @@ -299,6 +300,13 @@ func (c *Conn) SetIndex(index uint64) { c.index = index } +// Streaming returns true if the connection streams messages continuously. +func (c *Conn) Streaming() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.streaming +} + // URL returns the current URL of the connection. func (c *Conn) URL() url.URL { c.mu.Lock() @@ -314,7 +322,7 @@ func (c *Conn) SetURL(u url.URL) { } // Open opens a streaming connection to the broker. -func (c *Conn) Open(index uint64) error { +func (c *Conn) Open(index uint64, streaming bool) error { c.mu.Lock() defer c.mu.Unlock() @@ -328,6 +336,7 @@ func (c *Conn) Open(index uint64) error { // Set starting index. c.index = index + c.streaming = streaming // Create streaming channel. c.c = make(chan *Message, 0) @@ -430,8 +439,9 @@ func (c *Conn) streamer(closing <-chan struct{}) { u := c.URL() u.Path = "/messaging/messages" u.RawQuery = url.Values{ - "topicID": {strconv.FormatUint(c.topicID, 10)}, - "index": {strconv.FormatUint(c.Index(), 10)}, + "topicID": {strconv.FormatUint(c.topicID, 10)}, + "index": {strconv.FormatUint(c.Index(), 10)}, + "streaming": {strconv.FormatBool(c.Streaming())}, }.Encode() // Create request. @@ -479,6 +489,7 @@ func (c *Conn) stream(req *http.Request, closing <-chan struct{}) error { // Decode message from the stream. m := &Message{} if err := dec.Decode(m); err == io.EOF { + warn("EOF!!!") return nil } else if err != nil { return fmt.Errorf("decode: %s", err) diff --git a/messaging/client_test.go b/messaging/client_test.go index 15c6039b72..1b7f0fdf92 100644 --- a/messaging/client_test.go +++ b/messaging/client_test.go @@ -31,7 +31,7 @@ func TestClient_Conn(t *testing.T) { // Connect on topic #1. conn1 := c.Conn(1) - if err := conn1.Open(0); err != nil { + if err := conn1.Open(0, false); err != nil { t.Fatal(err) } else if m := <-conn1.C(); !reflect.DeepEqual(m, &messaging.Message{Index: 1, Data: []byte{100}}) { t.Fatalf("unexpected message(1): %#v", m) @@ -39,7 +39,7 @@ func TestClient_Conn(t *testing.T) { // Connect on topic #2. conn2 := c.Conn(2) - if err := conn2.Open(0); err != nil { + if err := conn2.Open(0, false); err != nil { t.Fatal(err) } else if m := <-conn2.C(); !reflect.DeepEqual(m, &messaging.Message{Index: 2, Data: []byte{200}}) { t.Fatalf("unexpected message(2): %#v", m) @@ -54,9 +54,9 @@ func TestClient_Conn(t *testing.T) { // Ensure that an error is returned when opening an opened connection. func TestConn_Open_ErrConnOpen(t *testing.T) { c := messaging.NewConn(1) - c.Open(0) + c.Open(0, false) defer c.Close() - if err := c.Open(0); err != messaging.ErrConnOpen { + if err := c.Open(0, false); err != messaging.ErrConnOpen { t.Fatalf("unexpected error: %s", err) } } @@ -64,9 +64,9 @@ func TestConn_Open_ErrConnOpen(t *testing.T) { // Ensure that an error is returned when opening a previously closed connection. func TestConn_Open_ErrConnCannotReuse(t *testing.T) { c := messaging.NewConn(1) - c.Open(0) + c.Open(0, false) c.Close() - if err := c.Open(0); err != messaging.ErrConnCannotReuse { + if err := c.Open(0, false); err != messaging.ErrConnCannotReuse { t.Fatalf("unexpected error: %s", err) } } @@ -74,7 +74,7 @@ func TestConn_Open_ErrConnCannotReuse(t *testing.T) { // Ensure that an error is returned when closing a closed connection. func TestConn_Close_ErrConnClosed(t *testing.T) { c := messaging.NewConn(1) - c.Open(0) + c.Open(0, false) c.Close() if err := c.Close(); err != messaging.ErrConnClosed { t.Fatalf("unexpected error: %s", err) @@ -102,7 +102,7 @@ func TestConn_Open(t *testing.T) { // Create and open connection to server. c := messaging.NewConn(100) c.SetURL(*MustParseURL(s.URL)) - if err := c.Open(200); err != nil { + if err := c.Open(200, false); err != nil { t.Fatal(err) } @@ -139,7 +139,7 @@ func TestConn_Open_Reconnect(t *testing.T) { // Create and open connection to server. c := messaging.NewConn(100) c.SetURL(*MustParseURL(s.URL)) - if err := c.Open(0); err != nil { + if err := c.Open(0, false); err != nil { t.Fatal(err) } @@ -179,124 +179,6 @@ func TestConn_Heartbeat(t *testing.T) { } } -/* -// Ensure that a client can open a connect to the broker. -func TestClient_Open(t *testing.T) { - c := NewClient() - defer c.Close() - - // Create replica on broker. - c.Server.Handler.Broker().PublishSync() - - // Open client to broker. - f := NewTempFile() - defer os.Remove(f) - u, _ := url.Parse(c.Server.URL) - if err := c.Open(f, []*url.URL{u}); err != nil { - t.Fatalf("unexpected error: %s", err) - } - - // Receive messages from the stream. - if m := <-c.C(); m.Type != messaging.InternalMessageType { - t.Fatalf("message type mismatch(internal): %x", m.Type) - } else if m = <-c.C(); m.Type != messaging.CreateReplicaMessageType { - t.Fatalf("message type mismatch(create replica): %x", m.Type) - } - - // Close connection to the broker. - if err := c.Client.Close(); err != nil { - t.Fatalf("unexpected error: %s", err) - } -} - -// Ensure that opening an already open client returns an error. -func TestClient_Open_ErrClientOpen(t *testing.T) { - c := NewClient(1000) - defer c.Close() - - // Open client to broker. - f := NewTempFile() - defer os.Remove(f) - u, _ := url.Parse(c.Server.URL) - c.Open(f, []*url.URL{u}) - if err := c.Open(f, []*url.URL{u}); err != messaging.ErrClientOpen { - t.Fatalf("unexpected error: %s", err) - } -} - -// Ensure that opening a client without a broker URL returns an error. -func TestClient_Open_ErrBrokerURLRequired(t *testing.T) { - t.Skip() - c := NewClient(1000) - defer c.Close() - f := NewTempFile() - defer os.Remove(f) - if err := c.Open(f, []*url.URL{}); err != messaging.ErrBrokerURLRequired { - t.Fatalf("unexpected error: %s", err) - } -} - -// Ensure that a client can close while a message is pending. -func TestClient_Close(t *testing.T) { - c := NewClient(1000) - defer c.Close() - - // Create replica on broker. - c.Server.Handler.Broker().CreateReplica(1000, &url.URL{Host: "localhost"}) - - // Open client to broker. - f := NewTempFile() - defer os.Remove(f) - u, _ := url.Parse(c.Server.URL) - if err := c.Open(f, []*url.URL{u}); err != nil { - t.Fatalf("unexpected error: %s", err) - } - time.Sleep(10 * time.Millisecond) - - // Close connection to the broker. - if err := c.Client.Close(); err != nil { - t.Fatalf("unexpected error: %s", err) - } -} - -// Ensure that a client can publish messages to the broker. -func TestClient_Publish(t *testing.T) { - c := OpenClient(1000) - defer c.Close() - - // Publish message to the broker. - if index, err := c.Publish(&messaging.Message{Type: 100, TopicID: messaging.BroadcastTopicID, Data: []byte{0}}); err != nil { - t.Fatalf("unexpected error: %v", err) - } else if index != 3 { - t.Fatalf("unexpected index: %d", index) - } -} - -// Ensure that a client receives an error when publishing to a stopped server. -func TestClient_Publish_ErrConnectionRefused(t *testing.T) { - c := OpenClient(1000) - c.Server.Close() - defer c.Close() - - // Publish message to the broker. - if _, err := c.Publish(&messaging.Message{Type: 100, TopicID: 0, Data: []byte{0}}); err == nil || !strings.Contains(err.Error(), "connection refused") { - t.Fatalf("unexpected error: %v", err) - } -} - -// Ensure that a client receives an error when publishing to a closed broker. -func TestClient_Publish_ErrLogClosed(t *testing.T) { - c := OpenClient(1000) - c.Server.Handler.Broker().Close() - defer c.Close() - - // Publish message to the broker. - if _, err := c.Publish(&messaging.Message{Type: 100, TopicID: 0, Data: []byte{0}}); err == nil || err.Error() != "log closed" { - t.Fatalf("unexpected error: %v", err) - } -} -*/ - // Client represents a test wrapper for the broker client. type Client struct { *messaging.Client diff --git a/messaging/handler.go b/messaging/handler.go index c302da8335..9c7bef5186 100644 --- a/messaging/handler.go +++ b/messaging/handler.go @@ -3,6 +3,7 @@ package messaging import ( "io" "io/ioutil" + "log" "net/http" "net/url" "strconv" @@ -14,7 +15,7 @@ import ( // Handler represents an HTTP handler by the broker. type Handler struct { Broker interface { - LeaderURL() *url.URL + LeaderURL() url.URL TopicReader(topicID, index uint64, streaming bool) io.ReadCloser Publish(m *Message) (uint64, error) SetTopicMaxIndex(topicID, index uint64) error @@ -76,16 +77,24 @@ func (h *Handler) getMessages(w http.ResponseWriter, req *http.Request) { defer r.Close() // Ensure we close the topic reader if the connection is disconnected. + done := make(chan struct{}, 0) + defer close(done) if w, ok := w.(http.CloseNotifier); ok { go func() { select { case <-w.CloseNotify(): + _ = r.Close() + case <-done: + return } }() } // Write out all data from the topic reader. - io.Copy(w, r) + // Automatically flush as reads come in. + if _, err := CopyFlush(w, r); err != nil { + log.Printf("message stream error: %s", err) + } } // postMessages publishes a message to the broker. @@ -162,7 +171,7 @@ func (h *Handler) error(w http.ResponseWriter, err error, code int) { // redirects to the current known leader. // If no leader is found then returns a 500. func (h *Handler) redirectToLeader(w http.ResponseWriter, r *http.Request) { - if u := h.Broker.LeaderURL(); u != nil { + if u := h.Broker.LeaderURL(); u.Host != "" { redirectURL := *r.URL redirectURL.Scheme = u.Scheme redirectURL.Host = u.Host @@ -172,3 +181,39 @@ func (h *Handler) redirectToLeader(w http.ResponseWriter, r *http.Request) { h.error(w, raft.ErrNotLeader, http.StatusInternalServerError) } + +// CopyFlush copies from src to dst until EOF or an error occurs. +// Each write is proceeded by a flush, if the writer implements http.Flusher. +// +// This implementation is copied from io.Copy(). +func CopyFlush(dst io.Writer, src io.Reader) (written int64, err error) { + buf := make([]byte, 32*1024) + for { + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if nw > 0 { + written += int64(nw) + } + + // Flush after write. + if dst, ok := dst.(http.Flusher); ok { + dst.Flush() + } + + if ew != nil { + err = ew + break + } else if nr != nw { + err = io.ErrShortWrite + break + } + } else if er == io.EOF { + break + } else if er != nil { + err = er + break + } + } + return written, err +} diff --git a/messaging/handler_test.go b/messaging/handler_test.go index 9cf307d23c..328d717eda 100644 --- a/messaging/handler_test.go +++ b/messaging/handler_test.go @@ -224,13 +224,13 @@ func TestHandler_ErrNotFound(t *testing.T) { // HandlerBroker is a mockable type that implements Handler.Broker. type HandlerBroker struct { - LeaderURLFunc func() *url.URL + LeaderURLFunc func() url.URL PublishFunc func(m *messaging.Message) (uint64, error) TopicReaderFunc func(topicID, index uint64, streaming bool) io.ReadCloser SetTopicMaxIndexFunc func(topicID, index uint64) error } -func (b *HandlerBroker) LeaderURL() *url.URL { return b.LeaderURLFunc() } +func (b *HandlerBroker) LeaderURL() url.URL { return b.LeaderURLFunc() } func (b *HandlerBroker) Publish(m *messaging.Message) (uint64, error) { return b.PublishFunc(m) } func (b *HandlerBroker) TopicReader(topicID, index uint64, streaming bool) io.ReadCloser { return b.TopicReaderFunc(topicID, index, streaming) diff --git a/raft/config.go b/raft/config.go index 4b81e3702b..a8d6253e78 100644 --- a/raft/config.go +++ b/raft/config.go @@ -33,7 +33,7 @@ func (c *Config) NodeByID(id uint64) *ConfigNode { } // NodeByURL returns a node by URL. -func (c *Config) NodeByURL(u *url.URL) *ConfigNode { +func (c *Config) NodeByURL(u url.URL) *ConfigNode { for _, n := range c.Nodes { if n.URL.String() == u.String() { return n @@ -43,11 +43,11 @@ func (c *Config) NodeByURL(u *url.URL) *ConfigNode { } // AddNode adds a new node to the config. -func (c *Config) AddNode(id uint64, u *url.URL) error { +func (c *Config) AddNode(id uint64, u url.URL) error { // Validate that the id is non-zero and the url exists. if id == 0 { return ErrInvalidNodeID - } else if u == nil { + } else if u.Host == "" { return ErrNodeURLRequired } @@ -97,13 +97,13 @@ func (c *Config) Clone() *Config { // ConfigNode represents a single machine in the raft configuration. type ConfigNode struct { ID uint64 - URL *url.URL + URL url.URL } // clone returns a deep copy of the node. func (n *ConfigNode) clone() *ConfigNode { - other := &ConfigNode{ID: n.ID, URL: &url.URL{}} - *other.URL = *n.URL + other := &ConfigNode{ID: n.ID} + other.URL = n.URL return other } @@ -162,11 +162,11 @@ func (dec *ConfigDecoder) Decode(c *Config) error { if err != nil { return err } else if n.URL == "" { - u = nil + u = &url.URL{} } // Append node to config. - if err := c.AddNode(n.ID, u); err != nil { + if err := c.AddNode(n.ID, *u); err != nil { return err } } diff --git a/raft/config_test.go b/raft/config_test.go index 451aa5ce58..d40cd2af48 100644 --- a/raft/config_test.go +++ b/raft/config_test.go @@ -14,8 +14,8 @@ import ( func TestConfig_NodeByID(t *testing.T) { c := &raft.Config{ Nodes: []*raft.ConfigNode{ - {ID: 1, URL: &url.URL{Host: "localhost:8000"}}, - {ID: 2, URL: &url.URL{Host: "localhost:9000"}}, + {ID: 1, URL: url.URL{Host: "localhost:8000"}}, + {ID: 2, URL: url.URL{Host: "localhost:9000"}}, }, } @@ -34,18 +34,18 @@ func TestConfig_NodeByID(t *testing.T) { func TestConfig_NodeByURL(t *testing.T) { c := &raft.Config{ Nodes: []*raft.ConfigNode{ - {ID: 1, URL: &url.URL{Host: "localhost:8000"}}, - {ID: 2, URL: &url.URL{Host: "localhost:9000"}}, + {ID: 1, URL: url.URL{Host: "localhost:8000"}}, + {ID: 2, URL: url.URL{Host: "localhost:9000"}}, }, } // Matching nodes should return the correct node. - if n := c.NodeByURL(&url.URL{Host: "localhost:8000"}); n != c.Nodes[0] { + if n := c.NodeByURL(url.URL{Host: "localhost:8000"}); n != c.Nodes[0] { t.Fatalf("unexpected node: %#v", n) } // Non-existent nodes should return nil. - if n := c.NodeByURL(&url.URL{Scheme: "http", Host: "localhost:8000"}); n != nil { + if n := c.NodeByURL(url.URL{Scheme: "http", Host: "localhost:8000"}); n != nil { t.Fatalf("expected nil node: %#v", n) } } @@ -53,11 +53,11 @@ func TestConfig_NodeByURL(t *testing.T) { // Ensure that the config can add nodes. func TestConfig_AddNode(t *testing.T) { var c raft.Config - c.AddNode(1, &url.URL{Host: "localhost:8000"}) - c.AddNode(2, &url.URL{Host: "localhost:9000"}) - if n := c.Nodes[0]; !reflect.DeepEqual(n, &raft.ConfigNode{ID: 1, URL: &url.URL{Host: "localhost:8000"}}) { + c.AddNode(1, url.URL{Host: "localhost:8000"}) + c.AddNode(2, url.URL{Host: "localhost:9000"}) + if n := c.Nodes[0]; !reflect.DeepEqual(n, &raft.ConfigNode{ID: 1, URL: url.URL{Host: "localhost:8000"}}) { t.Fatalf("unexpected node(0): %#v", n) - } else if n = c.Nodes[1]; !reflect.DeepEqual(n, &raft.ConfigNode{ID: 2, URL: &url.URL{Host: "localhost:9000"}}) { + } else if n = c.Nodes[1]; !reflect.DeepEqual(n, &raft.ConfigNode{ID: 2, URL: url.URL{Host: "localhost:9000"}}) { t.Fatalf("unexpected node(1): %#v", n) } } @@ -65,8 +65,8 @@ func TestConfig_AddNode(t *testing.T) { // Ensure that the config can remove nodes. func TestConfig_RemoveNode(t *testing.T) { var c raft.Config - c.AddNode(1, &url.URL{Host: "localhost:8000"}) - c.AddNode(2, &url.URL{Host: "localhost:9000"}) + c.AddNode(1, url.URL{Host: "localhost:8000"}) + c.AddNode(2, url.URL{Host: "localhost:9000"}) if err := c.RemoveNode(1); err != nil { t.Fatalf("unexpected error(0): %s", err) } else if err = c.RemoveNode(2); err != nil { @@ -83,8 +83,8 @@ func TestConfigEncoder_Encode(t *testing.T) { Index: 20, MaxNodeID: 3, Nodes: []*raft.ConfigNode{ - {ID: 1, URL: &url.URL{Host: "localhost:8000"}}, - {ID: 2, URL: &url.URL{Host: "localhost:9000"}}, + {ID: 1, URL: url.URL{Host: "localhost:8000"}}, + {ID: 2, URL: url.URL{Host: "localhost:9000"}}, }, } @@ -103,8 +103,8 @@ func TestConfigDecoder_Decode(t *testing.T) { Index: 20, MaxNodeID: 3, Nodes: []*raft.ConfigNode{ - {ID: 1, URL: &url.URL{Host: "localhost:8000"}}, - {ID: 2, URL: &url.URL{Host: "localhost:9000"}}, + {ID: 1, URL: url.URL{Host: "localhost:8000"}}, + {ID: 2, URL: url.URL{Host: "localhost:9000"}}, }, } diff --git a/raft/handler.go b/raft/handler.go index 0e271e60a4..2d517165e1 100644 --- a/raft/handler.go +++ b/raft/handler.go @@ -12,7 +12,7 @@ import ( // Handler represents an HTTP endpoint for Raft to communicate over. type Handler struct { Log interface { - AddPeer(u *url.URL) (id uint64, leaderID uint64, config *Config, err error) + AddPeer(u url.URL) (id uint64, leaderID uint64, config *Config, err error) RemovePeer(id uint64) error Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64, err error) WriteEntriesTo(w io.Writer, id, term, index uint64) error @@ -60,7 +60,7 @@ func (h *Handler) serveJoin(w http.ResponseWriter, r *http.Request) { } // Add peer to the log. - id, leaderID, config, err := h.Log.AddPeer(u) + id, leaderID, config, err := h.Log.AddPeer(*u) if err != nil { w.Header().Set("X-Raft-Error", err.Error()) w.WriteHeader(http.StatusInternalServerError) diff --git a/raft/handler_test.go b/raft/handler_test.go index ed5e64f49c..bb36ac19cf 100644 --- a/raft/handler_test.go +++ b/raft/handler_test.go @@ -11,10 +11,15 @@ import ( "github.com/influxdb/influxdb/raft" ) +func init() { + // Ensure Log implements the Handler.Log interface. + _ = raft.Handler{Log: raft.NewLog()} +} + // Ensure a node can join a cluster over HTTP. func TestHandler_HandleJoin(t *testing.T) { h := NewHandler() - h.AddPeerFunc = func(u *url.URL) (uint64, uint64, *raft.Config, error) { + h.AddPeerFunc = func(u url.URL) (uint64, uint64, *raft.Config, error) { if u.String() != "http://localhost:1000" { t.Fatalf("unexpected url: %s", u) } @@ -42,7 +47,7 @@ func TestHandler_HandleJoin(t *testing.T) { // Ensure that joining with an invalid query string with return an error. func TestHandler_HandleJoin_Error(t *testing.T) { h := NewHandler() - h.AddPeerFunc = func(u *url.URL) (uint64, uint64, *raft.Config, error) { + h.AddPeerFunc = func(u url.URL) (uint64, uint64, *raft.Config, error) { return 0, 0, nil, raft.ErrClosed } s := httptest.NewServer(h) @@ -364,7 +369,7 @@ func TestHandler_Ping(t *testing.T) { // Handler represents a test wrapper for the raft.Handler. type Handler struct { *raft.Handler - AddPeerFunc func(u *url.URL) (uint64, uint64, *raft.Config, error) + AddPeerFunc func(u url.URL) (uint64, uint64, *raft.Config, error) RemovePeerFunc func(id uint64) error HeartbeatFunc func(term, commitIndex, leaderID uint64) (currentIndex uint64, err error) WriteEntriesToFunc func(w io.Writer, id, term, index uint64) error @@ -378,8 +383,8 @@ func NewHandler() *Handler { return h } -func (h *Handler) AddPeer(u *url.URL) (uint64, uint64, *raft.Config, error) { return h.AddPeerFunc(u) } -func (h *Handler) RemovePeer(id uint64) error { return h.RemovePeerFunc(id) } +func (h *Handler) AddPeer(u url.URL) (uint64, uint64, *raft.Config, error) { return h.AddPeerFunc(u) } +func (h *Handler) RemovePeer(id uint64) error { return h.RemovePeerFunc(id) } func (h *Handler) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64, err error) { return h.HeartbeatFunc(term, commitIndex, leaderID) diff --git a/raft/log.go b/raft/log.go index 9c16025804..f41f4864d9 100644 --- a/raft/log.go +++ b/raft/log.go @@ -104,18 +104,18 @@ type Log struct { closing chan struct{} // close notification // Network address to the reach the log. - URL *url.URL + url url.URL // The state machine that log entries will be applied to. FSM FSM // The transport used to communicate with other nodes in the cluster. Transport interface { - Join(u *url.URL, nodeURL *url.URL) (id uint64, leaderID uint64, config *Config, err error) - Leave(u *url.URL, id uint64) error - Heartbeat(u *url.URL, term, commitIndex, leaderID uint64) (lastIndex uint64, err error) - ReadFrom(u *url.URL, id, term, index uint64) (io.ReadCloser, error) - RequestVote(u *url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error + Join(u url.URL, nodeURL url.URL) (id uint64, leaderID uint64, config *Config, err error) + Leave(u url.URL, id uint64) error + Heartbeat(u url.URL, term, commitIndex, leaderID uint64) (lastIndex uint64, err error) + ReadFrom(u url.URL, id, term, index uint64) (io.ReadCloser, error) + RequestVote(u url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error } // Clock is an abstraction of time. @@ -154,6 +154,25 @@ func NewLog() *Log { // Returns an empty string if the log is closed. func (l *Log) Path() string { return l.path } +// URL returns the URL for the log. +func (l *Log) URL() url.URL { + l.mu.Lock() + defer l.mu.Unlock() + return l.url +} + +// SetURL sets the URL for the log. This must be set before opening. +func (l *Log) SetURL(u url.URL) { + l.mu.Lock() + defer l.mu.Unlock() + + if l.opened() { + panic("url cannot be set while log is open") + } + + l.url = u +} + func (l *Log) idPath() string { return filepath.Join(l.path, "id") } func (l *Log) termPath() string { return filepath.Join(l.path, "term") } func (l *Log) configPath() string { return filepath.Join(l.path, "config") } @@ -459,7 +478,7 @@ func (l *Log) Initialize() error { // Generate a new configuration with one node. config = &Config{MaxNodeID: id} - config.AddNode(id, l.URL) + config.AddNode(id, l.url) // Generate new 8-hex digit cluster identifier. config.ClusterID = uint64(l.Rand()) @@ -511,8 +530,8 @@ func (l *Log) SetLogOutput(w io.Writer) { func (l *Log) updateLogPrefix() { var host string - if l.URL != nil { - host = l.URL.Host + if l.url.Host != "" { + host = l.url.Host } l.Logger.SetPrefix(fmt.Sprintf("[raft] %s ", host)) } @@ -533,13 +552,24 @@ func (l *Log) tracef(msg string, v ...interface{}) { // Leader returns the id and URL associated with the current leader. // Returns zero if there is no current leader. -func (l *Log) Leader() (id uint64, u *url.URL) { +func (l *Log) Leader() (id uint64, u url.URL) { l.mu.Lock() defer l.mu.Unlock() return l.leader() } -func (l *Log) leader() (id uint64, u *url.URL) { +// ClusterID returns the identifier for the cluster. +// Returns zero if the cluster has not been initialized yet. +func (l *Log) ClusterID() uint64 { + l.mu.Lock() + defer l.mu.Unlock() + if l.config == nil { + return 0 + } + return l.config.ClusterID +} + +func (l *Log) leader() (id uint64, u url.URL) { // Ignore if there's no configuration set. if l.config == nil { return @@ -556,9 +586,9 @@ func (l *Log) leader() (id uint64, u *url.URL) { // Join contacts a node in the cluster to request membership. // A log cannot join a cluster if it has already been initialized. -func (l *Log) Join(u *url.URL) error { +func (l *Log) Join(u url.URL) error { // Validate under lock. - var nodeURL *url.URL + var nodeURL url.URL if err := func() error { l.mu.Lock() defer l.mu.Unlock() @@ -567,11 +597,11 @@ func (l *Log) Join(u *url.URL) error { return ErrClosed } else if l.id != 0 { return ErrInitialized - } else if l.URL == nil { + } else if l.url.Host == "" { return ErrURLRequired } - nodeURL = l.URL + nodeURL = l.url return nil }(); err != nil { return err @@ -727,7 +757,7 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup, transitioning <-chan struct{}) l.mu.Unlock() // If no leader exists then wait momentarily and retry. - if u == nil { + if u.Host == "" { l.tracef("readFromLeader: no leader") time.Sleep(100 * time.Millisecond) continue @@ -1309,9 +1339,9 @@ func (l *Log) mustApplyRemovePeer(e *LogEntry) error { // AddPeer creates a new peer in the cluster. // Returns the new peer's identifier and the current configuration. -func (l *Log) AddPeer(u *url.URL) (uint64, uint64, *Config, error) { +func (l *Log) AddPeer(u url.URL) (uint64, uint64, *Config, error) { // Validate URL. - if u == nil { + if u.Host == "" { return 0, 0, nil, fmt.Errorf("peer url required") } diff --git a/raft/log_test.go b/raft/log_test.go index 6d18fc8945..92145b8cdd 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -19,7 +19,7 @@ import ( // Ensure that opening an already open log returns an error. func TestLog_Open_ErrOpen(t *testing.T) { - l := NewInitializedLog(&url.URL{Host: "log0"}) + l := NewInitializedLog(url.URL{Host: "log0"}) defer l.Close() if err := l.Open(tempfile()); err != raft.ErrOpen { t.Fatal("expected error") @@ -28,7 +28,7 @@ func TestLog_Open_ErrOpen(t *testing.T) { // Ensure that a log can be checked for being open. func TestLog_Opened(t *testing.T) { - l := NewInitializedLog(&url.URL{Host: "log0"}) + l := NewInitializedLog(url.URL{Host: "log0"}) if l.Opened() != true { t.Fatalf("expected open") } @@ -40,7 +40,7 @@ func TestLog_Opened(t *testing.T) { // Ensure that reopening an existing log will restore its ID. func TestLog_Reopen(t *testing.T) { - l := NewInitializedLog(&url.URL{Host: "log0"}) + l := NewInitializedLog(url.URL{Host: "log0"}) if l.ID() != 1 { t.Fatalf("expected id == 1") } @@ -64,7 +64,7 @@ func TestLog_Reopen(t *testing.T) { // Ensure that a single node-cluster can apply a log entry. func TestLog_Apply(t *testing.T) { - l := NewInitializedLog(&url.URL{Host: "log0"}) + l := NewInitializedLog(url.URL{Host: "log0"}) defer l.Close() // Apply a command. @@ -87,7 +87,7 @@ func TestLog_Apply(t *testing.T) { // Ensure that a node has no configuration after it's closed. func TestLog_Config_Closed(t *testing.T) { - l := NewInitializedLog(&url.URL{Host: "log0"}) + l := NewInitializedLog(url.URL{Host: "log0"}) defer l.Close() l.Log.Close() if l.Config() != nil { @@ -353,12 +353,13 @@ func NewCluster(fsmFn func() raft.FSM) *Cluster { logN := 3 for i := 0; i < logN; i++ { - l := NewLog(&url.URL{Host: fmt.Sprintf("log%d", i)}) + l := NewLog(url.URL{Host: fmt.Sprintf("log%d", i)}) l.Log.FSM = fsmFn() l.Transport = t c.Logs = append(c.Logs, l) t.register(l.Log) - warnf("Log %s: %p", l.URL.String(), l.Log) + u := l.URL() + warnf("Log %s: %p", u.String(), l.Log) } warn("") @@ -372,7 +373,7 @@ func NewCluster(fsmFn func() raft.FSM) *Cluster { c.Logs[0].MustWaitUncommitted(2) c.Logs[0].Clock.apply() }() - if err := c.Logs[1].Join(c.Logs[0].URL); err != nil { + if err := c.Logs[1].Join(c.Logs[0].URL()); err != nil { panic("join: " + err.Error()) } c.Logs[0].Clock.heartbeat() @@ -390,7 +391,7 @@ func NewCluster(fsmFn func() raft.FSM) *Cluster { c.Logs[1].Clock.apply() c.Logs[2].Clock.apply() }() - if err := c.Logs[2].Log.Join(c.Logs[0].Log.URL); err != nil { + if err := c.Logs[2].Log.Join(c.Logs[0].Log.URL()); err != nil { panic("join: " + err.Error()) } @@ -409,14 +410,15 @@ func NewRealTimeCluster(logN int, fsmFn func() raft.FSM) *Cluster { t := NewTransport() for i := 0; i < logN; i++ { - l := NewLog(&url.URL{Host: fmt.Sprintf("log%d", i)}) + l := NewLog(url.URL{Host: fmt.Sprintf("log%d", i)}) l.Log.FSM = fsmFn() l.Clock = nil l.Log.Clock = raft.NewClock() l.Transport = t c.Logs = append(c.Logs, l) t.register(l.Log) - warnf("Log %s: %p", l.URL.String(), l.Log) + u := l.URL() + warnf("Log %s: %p", u.String(), l.Log) } warn("") @@ -427,7 +429,7 @@ func NewRealTimeCluster(logN int, fsmFn func() raft.FSM) *Cluster { // Join remaining nodes. for i := 1; i < logN; i++ { c.Logs[i].MustOpen() - c.Logs[i].MustJoin(c.Logs[0].URL) + c.Logs[i].MustJoin(c.Logs[0].URL()) } // Ensure nodes are ready. @@ -494,9 +496,9 @@ type Log struct { } // NewLog returns a new instance of Log. -func NewLog(u *url.URL) *Log { +func NewLog(u url.URL) *Log { l := &Log{Log: raft.NewLog(), Clock: NewClock()} - l.URL = u + l.SetURL(u) l.Log.Clock = l.Clock l.Rand = seq() l.DebugEnabled = true @@ -507,7 +509,7 @@ func NewLog(u *url.URL) *Log { } // NewInitializedLog returns a new initialized Node. -func NewInitializedLog(u *url.URL) *Log { +func NewInitializedLog(u url.URL) *Log { l := NewLog(u) l.Log.FSM = &FSM{} l.MustOpen() @@ -536,7 +538,7 @@ func (l *Log) MustInitialize() { } // MustJoin joins the log to another log. Panic on error. -func (l *Log) MustJoin(u *url.URL) { +func (l *Log) MustJoin(u url.URL) { if err := l.Join(u); err != nil { panic("join: " + err.Error()) } @@ -552,21 +554,24 @@ func (l *Log) Close() error { // MustWaits waits for at least a given applied index. Panic on error. func (l *Log) MustWait(index uint64) { if err := l.Log.Wait(index); err != nil { - panic(l.URL.String() + " wait: " + err.Error()) + u := l.URL() + panic(u.String() + " wait: " + err.Error()) } } // MustCommitted waits for at least a given committed index. Panic on error. func (l *Log) MustWaitCommitted(index uint64) { if err := l.Log.WaitCommitted(index); err != nil { - panic(l.URL.String() + " wait committed: " + err.Error()) + u := l.URL() + panic(u.String() + " wait committed: " + err.Error()) } } // MustWaitUncommitted waits for at least a given uncommitted index. Panic on error. func (l *Log) MustWaitUncommitted(index uint64) { if err := l.Log.WaitUncommitted(index); err != nil { - panic(l.URL.String() + " wait uncommitted: " + err.Error()) + u := l.URL() + panic(u.String() + " wait uncommitted: " + err.Error()) } } diff --git a/raft/transport.go b/raft/transport.go index 5ea0c4d624..ab35f5ed36 100644 --- a/raft/transport.go +++ b/raft/transport.go @@ -15,9 +15,9 @@ import ( type HTTPTransport struct{} // Join requests membership into a node's cluster. -func (t *HTTPTransport) Join(uri *url.URL, nodeURL *url.URL) (uint64, uint64, *Config, error) { +func (t *HTTPTransport) Join(uri url.URL, nodeURL url.URL) (uint64, uint64, *Config, error) { // Construct URL. - u := *uri + u := uri u.Path = path.Join(u.Path, "raft/join") u.RawQuery = (&url.Values{"url": {nodeURL.String()}}).Encode() @@ -55,9 +55,9 @@ func (t *HTTPTransport) Join(uri *url.URL, nodeURL *url.URL) (uint64, uint64, *C } // Leave removes a node from a cluster's membership. -func (t *HTTPTransport) Leave(uri *url.URL, id uint64) error { +func (t *HTTPTransport) Leave(uri url.URL, id uint64) error { // Construct URL. - u := *uri + u := uri u.Path = path.Join(u.Path, "raft/leave") u.RawQuery = (&url.Values{"id": {strconv.FormatUint(id, 10)}}).Encode() @@ -77,9 +77,9 @@ func (t *HTTPTransport) Leave(uri *url.URL, id uint64) error { } // Heartbeat checks the status of a follower. -func (t *HTTPTransport) Heartbeat(uri *url.URL, term, commitIndex, leaderID uint64) (uint64, error) { +func (t *HTTPTransport) Heartbeat(uri url.URL, term, commitIndex, leaderID uint64) (uint64, error) { // Construct URL. - u := *uri + u := uri u.Path = path.Join(u.Path, "raft/heartbeat") // Set URL parameters. @@ -112,9 +112,9 @@ func (t *HTTPTransport) Heartbeat(uri *url.URL, term, commitIndex, leaderID uint } // ReadFrom streams the log from a leader. -func (t *HTTPTransport) ReadFrom(uri *url.URL, id, term, index uint64) (io.ReadCloser, error) { +func (t *HTTPTransport) ReadFrom(uri url.URL, id, term, index uint64) (io.ReadCloser, error) { // Construct URL. - u := *uri + u := uri u.Path = path.Join(u.Path, "raft/stream") // Set URL parameters. @@ -140,9 +140,9 @@ func (t *HTTPTransport) ReadFrom(uri *url.URL, id, term, index uint64) (io.ReadC } // RequestVote requests a vote for a candidate in a given term. -func (t *HTTPTransport) RequestVote(uri *url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error { +func (t *HTTPTransport) RequestVote(uri url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error { // Construct URL. - u := *uri + u := uri u.Path = path.Join(u.Path, "raft/vote") // Set URL parameters. diff --git a/raft/transport_test.go b/raft/transport_test.go index 52618a57f5..9fdbe55b46 100644 --- a/raft/transport_test.go +++ b/raft/transport_test.go @@ -34,7 +34,7 @@ func TestHTTPTransport_Join(t *testing.T) { // Execute join against test server. u, _ := url.Parse(s.URL) - id, leaderID, config, err := (&raft.HTTPTransport{}).Join(u, &url.URL{Host: "local"}) + id, leaderID, config, err := (&raft.HTTPTransport{}).Join(*u, url.URL{Host: "local"}) if err != nil { t.Fatalf("unexpected error: %s", err) } else if id != 1 { @@ -48,7 +48,7 @@ func TestHTTPTransport_Join(t *testing.T) { // Ensure that joining a server that doesn't exist returns an error. func TestHTTPTransport_Join_ErrConnectionRefused(t *testing.T) { - _, _, _, err := (&raft.HTTPTransport{}).Join(&url.URL{Scheme: "http", Host: "localhost:27322"}, &url.URL{Host: "local"}) + _, _, _, err := (&raft.HTTPTransport{}).Join(url.URL{Scheme: "http", Host: "localhost:27322"}, url.URL{Host: "local"}) if err == nil || !strings.Contains(err.Error(), "connection refused") { t.Fatalf("unexpected error: %s", err) } @@ -64,7 +64,7 @@ func TestHTTPTransport_Join_ErrInvalidID(t *testing.T) { // Execute join against test server. u, _ := url.Parse(s.URL) - _, _, _, err := (&raft.HTTPTransport{}).Join(u, &url.URL{Host: "local"}) + _, _, _, err := (&raft.HTTPTransport{}).Join(*u, url.URL{Host: "local"}) if err == nil || err.Error() != `invalid id: "xxx"` { t.Fatalf("unexpected error: %s", err) } @@ -82,7 +82,7 @@ func TestHTTPTransport_Join_ErrInvalidConfig(t *testing.T) { // Execute join against test server. u, _ := url.Parse(s.URL) - _, _, _, err := (&raft.HTTPTransport{}).Join(u, &url.URL{Host: "local"}) + _, _, _, err := (&raft.HTTPTransport{}).Join(*u, url.URL{Host: "local"}) if err == nil || err.Error() != `config unmarshal: unexpected EOF` { t.Fatalf("unexpected error: %s", err) } @@ -99,7 +99,7 @@ func TestHTTPTransport_Join_Err(t *testing.T) { // Execute join against test server. u, _ := url.Parse(s.URL) - _, _, _, err := (&raft.HTTPTransport{}).Join(u, &url.URL{Host: "local"}) + _, _, _, err := (&raft.HTTPTransport{}).Join(*u, url.URL{Host: "local"}) if err == nil || err.Error() != `oh no` { t.Fatalf("unexpected error: %s", err) } @@ -119,14 +119,14 @@ func TestHTTPTransport_Leave(t *testing.T) { // Execute leave against test server. u, _ := url.Parse(s.URL) - if err := (&raft.HTTPTransport{}).Leave(u, 1); err != nil { + if err := (&raft.HTTPTransport{}).Leave(*u, 1); err != nil { t.Fatalf("unexpected error: %s", err) } } // Ensure that leaving a server that doesn't exist returns an error. func TestHTTPTransport_Leave_ErrConnectionRefused(t *testing.T) { - err := (&raft.HTTPTransport{}).Leave(&url.URL{Scheme: "http", Host: "localhost:27322"}, 1) + err := (&raft.HTTPTransport{}).Leave(url.URL{Scheme: "http", Host: "localhost:27322"}, 1) if err == nil || !strings.Contains(err.Error(), "connection refused") { t.Fatalf("unexpected error: %s", err) } @@ -142,7 +142,7 @@ func TestHTTPTransport_Leave_Err(t *testing.T) { // Execute leave against test server. u, _ := url.Parse(s.URL) - err := (&raft.HTTPTransport{}).Leave(u, 1) + err := (&raft.HTTPTransport{}).Leave(*u, 1) if err == nil || err.Error() != `oh no` { t.Fatalf("unexpected error: %s", err) } @@ -171,7 +171,7 @@ func TestHTTPTransport_Heartbeat(t *testing.T) { // Execute heartbeat against test server. u, _ := url.Parse(s.URL) - newIndex, err := (&raft.HTTPTransport{}).Heartbeat(u, 1, 2, 3) + newIndex, err := (&raft.HTTPTransport{}).Heartbeat(*u, 1, 2, 3) if err != nil { t.Fatalf("unexpected error: %s", err) } else if newIndex != 4 { @@ -198,7 +198,7 @@ func TestHTTPTransport_Heartbeat_Err(t *testing.T) { })) u, _ := url.Parse(s.URL) - _, err := (&raft.HTTPTransport{}).Heartbeat(u, 1, 2, 3) + _, err := (&raft.HTTPTransport{}).Heartbeat(*u, 1, 2, 3) if err == nil { t.Errorf("%d. expected error", i) } else if tt.err != err.Error() { @@ -211,7 +211,7 @@ func TestHTTPTransport_Heartbeat_Err(t *testing.T) { // Ensure an HTTP heartbeat to a stopped server returns an error. func TestHTTPTransport_Heartbeat_ErrConnectionRefused(t *testing.T) { u, _ := url.Parse("http://localhost:41932") - _, err := (&raft.HTTPTransport{}).Heartbeat(u, 0, 0, 0) + _, err := (&raft.HTTPTransport{}).Heartbeat(*u, 0, 0, 0) if err == nil { t.Fatal("expected error") } else if !strings.Contains(err.Error(), `connection refused`) { @@ -241,7 +241,7 @@ func TestHTTPTransport_ReadFrom(t *testing.T) { // Execute stream against test server. u, _ := url.Parse(s.URL) - r, err := (&raft.HTTPTransport{}).ReadFrom(u, 1, 2, 3) + r, err := (&raft.HTTPTransport{}).ReadFrom(*u, 1, 2, 3) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -261,7 +261,7 @@ func TestHTTPTransport_ReadFrom_Err(t *testing.T) { // Execute stream against test server. u, _ := url.Parse(s.URL) - r, err := (&raft.HTTPTransport{}).ReadFrom(u, 0, 0, 0) + r, err := (&raft.HTTPTransport{}).ReadFrom(*u, 0, 0, 0) if err == nil { t.Fatalf("expected error") } else if err.Error() != `bad stream` { @@ -274,7 +274,7 @@ func TestHTTPTransport_ReadFrom_Err(t *testing.T) { // Ensure an streaming over HTTP to a stopped server returns an error. func TestHTTPTransport_ReadFrom_ErrConnectionRefused(t *testing.T) { u, _ := url.Parse("http://localhost:41932") - _, err := (&raft.HTTPTransport{}).ReadFrom(u, 0, 0, 0) + _, err := (&raft.HTTPTransport{}).ReadFrom(*u, 0, 0, 0) if err == nil { t.Fatal("expected error") } else if !strings.Contains(err.Error(), `connection refused`) { @@ -307,7 +307,7 @@ func TestHTTPTransport_RequestVote(t *testing.T) { // Execute heartbeat against test server. u, _ := url.Parse(s.URL) - if err := (&raft.HTTPTransport{}).RequestVote(u, 1, 2, 3, 4); err != nil { + if err := (&raft.HTTPTransport{}).RequestVote(*u, 1, 2, 3, 4); err != nil { t.Fatalf("unexpected error: %s", err) } } @@ -322,7 +322,7 @@ func TestHTTPTransport_RequestVote_Error(t *testing.T) { defer s.Close() u, _ := url.Parse(s.URL) - if err := (&raft.HTTPTransport{}).RequestVote(u, 0, 0, 0, 0); err == nil { + if err := (&raft.HTTPTransport{}).RequestVote(*u, 0, 0, 0, 0); err == nil { t.Errorf("expected error") } else if err.Error() != `already voted` { t.Errorf("unexpected error: %s", err) @@ -332,7 +332,7 @@ func TestHTTPTransport_RequestVote_Error(t *testing.T) { // Ensure that requesting a vote over HTTP to a stopped server returns an error. func TestHTTPTransport_RequestVote_ErrConnectionRefused(t *testing.T) { u, _ := url.Parse("http://localhost:41932") - if err := (&raft.HTTPTransport{}).RequestVote(u, 0, 0, 0, 0); err == nil { + if err := (&raft.HTTPTransport{}).RequestVote(*u, 0, 0, 0, 0); err == nil { t.Fatal("expected error") } else if !strings.Contains(err.Error(), `connection refused`) { t.Fatalf("unexpected error: %s", err) @@ -352,11 +352,11 @@ func NewTransport() *Transport { // register registers a log by hostname. func (t *Transport) register(l *raft.Log) { - t.logs[l.URL.Host] = l + t.logs[l.URL().Host] = l } // log returns a log registered by hostname. -func (t *Transport) log(u *url.URL) (*raft.Log, error) { +func (t *Transport) log(u url.URL) (*raft.Log, error) { if l := t.logs[u.Host]; l != nil { return l, nil } @@ -364,7 +364,7 @@ func (t *Transport) log(u *url.URL) (*raft.Log, error) { } // Join calls the AddPeer method on the target log. -func (t *Transport) Join(u *url.URL, nodeURL *url.URL) (uint64, uint64, *raft.Config, error) { +func (t *Transport) Join(u url.URL, nodeURL url.URL) (uint64, uint64, *raft.Config, error) { l, err := t.log(u) if err != nil { return 0, 0, nil, err @@ -373,7 +373,7 @@ func (t *Transport) Join(u *url.URL, nodeURL *url.URL) (uint64, uint64, *raft.Co } // Leave calls the RemovePeer method on the target log. -func (t *Transport) Leave(u *url.URL, id uint64) error { +func (t *Transport) Leave(u url.URL, id uint64) error { l, err := t.log(u) if err != nil { return err @@ -382,7 +382,7 @@ func (t *Transport) Leave(u *url.URL, id uint64) error { } // Heartbeat calls the Heartbeat method on the target log. -func (t *Transport) Heartbeat(u *url.URL, term, commitIndex, leaderID uint64) (lastIndex uint64, err error) { +func (t *Transport) Heartbeat(u url.URL, term, commitIndex, leaderID uint64) (lastIndex uint64, err error) { l, err := t.log(u) if err != nil { return 0, err @@ -391,7 +391,7 @@ func (t *Transport) Heartbeat(u *url.URL, term, commitIndex, leaderID uint64) (l } // ReadFrom streams entries from the target log. -func (t *Transport) ReadFrom(u *url.URL, id, term, index uint64) (io.ReadCloser, error) { +func (t *Transport) ReadFrom(u url.URL, id, term, index uint64) (io.ReadCloser, error) { l, err := t.log(u) if err != nil { return nil, err @@ -409,7 +409,7 @@ func (t *Transport) ReadFrom(u *url.URL, id, term, index uint64) (io.ReadCloser, } // RequestVote calls RequestVote() on the target log. -func (t *Transport) RequestVote(u *url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error { +func (t *Transport) RequestVote(u url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error { l, err := t.log(u) if err != nil { return err diff --git a/server.go b/server.go index 7d617e1326..212136c147 100644 --- a/server.go +++ b/server.go @@ -355,7 +355,7 @@ func (s *Server) setClient(client MessagingClient) error { if client != nil { // Create connection for broadcast channel. conn := client.Conn(BroadcastTopicID) - if err := conn.Open(s.index); err != nil { + if err := conn.Open(s.index, true); err != nil { return fmt.Errorf("open conn: %s", err) } @@ -417,16 +417,16 @@ func (s *Server) Sync(index uint64) error { } // Initialize creates a new data node and initializes the server's id to 1. -func (s *Server) Initialize(u *url.URL) error { +func (s *Server) Initialize(u url.URL) error { // Create a new data node. - if err := s.CreateDataNode(u); err != nil { + if err := s.CreateDataNode(&u); err != nil { return err } // Ensure the data node returns with an ID of 1. // If it doesn't then something went really wrong. We have to panic because // the messaging client relies on the first server being assigned ID 1. - n := s.DataNodeByURL(u) + n := s.DataNodeByURL(&u) assert(n != nil && n.ID == 1, "invalid initial server id: %d", n.ID) // Set the ID on the metastore. @@ -2816,16 +2816,33 @@ func (r *Results) Error() error { // MessagingClient represents the client used to connect to brokers. type MessagingClient interface { + Open(path string, urls []url.URL) error + Close() error + // Publishes a message to the broker. Publish(m *messaging.Message) (index uint64, err error) // Conn returns an open, streaming connection to a topic. Conn(topicID uint64) MessagingConn + + // Sets the logging destination. + SetLogOutput(w io.Writer) } +type messagingClient struct { + *messaging.Client +} + +// NewMessagingClient returns an instance of MessagingClient. +func NewMessagingClient() MessagingClient { + return &messagingClient{messaging.NewClient()} +} + +func (c *messagingClient) Conn(topicID uint64) MessagingConn { return c.Client.Conn(topicID) } + // MessagingConn represents a streaming connection to a single broker topic. type MessagingConn interface { - Open(index uint64) error + Open(index uint64, streaming bool) error C() <-chan *messaging.Message } diff --git a/server_test.go b/server_test.go index 8ee3d258a1..9235fcaa20 100644 --- a/server_test.go +++ b/server_test.go @@ -10,13 +10,12 @@ import ( "os" "reflect" "strings" - "sync" "testing" "time" "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/influxql" - "github.com/influxdb/influxdb/messaging" + "github.com/influxdb/influxdb/test" "golang.org/x/crypto/bcrypt" ) @@ -40,7 +39,7 @@ func TestServer_Open_ErrPathRequired(t *testing.T) { t.Skip("pending") } // Ensure the server can create a new data node. func TestServer_CreateDataNode(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -64,7 +63,7 @@ func TestServer_CreateDataNode(t *testing.T) { // Ensure the server returns an error when creating a duplicate node. func TestServer_CreateDatabase_ErrDataNodeExists(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -81,7 +80,7 @@ func TestServer_CreateDatabase_ErrDataNodeExists(t *testing.T) { // Ensure the server can delete a node. func TestServer_DeleteDataNode(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -106,7 +105,7 @@ func TestServer_DeleteDataNode(t *testing.T) { // Test unuathorized requests logging func TestServer_UnauthorizedRequests(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -149,7 +148,7 @@ func TestServer_UnauthorizedRequests(t *testing.T) { // Test user privilege authorization. func TestServer_UserPrivilegeAuthorization(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -186,7 +185,7 @@ func TestServer_UserPrivilegeAuthorization(t *testing.T) { // Test single statement query authorization. func TestServer_SingleStatementQueryAuthorization(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -255,7 +254,7 @@ func TestServer_SingleStatementQueryAuthorization(t *testing.T) { // Test multiple statement query authorization. func TestServer_MultiStatementQueryAuthorization(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -302,7 +301,7 @@ func TestServer_MultiStatementQueryAuthorization(t *testing.T) { // Ensure the server can create a database. func TestServer_CreateDatabase(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -321,7 +320,7 @@ func TestServer_CreateDatabase(t *testing.T) { // Ensure the server returns an error when creating a duplicate database. func TestServer_CreateDatabase_ErrDatabaseExists(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -337,7 +336,7 @@ func TestServer_CreateDatabase_ErrDatabaseExists(t *testing.T) { // Ensure the server can drop a database. func TestServer_DropDatabase(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -360,7 +359,7 @@ func TestServer_DropDatabase(t *testing.T) { // Ensure the server returns an error when dropping a database that doesn't exist. func TestServer_DropDatabase_ErrDatabaseNotFound(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -373,7 +372,7 @@ func TestServer_DropDatabase_ErrDatabaseNotFound(t *testing.T) { // Ensure the server can return a list of all databases. func TestServer_Databases(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -395,7 +394,7 @@ func TestServer_Databases(t *testing.T) { // Ensure the server can create a new user. func TestServer_CreateUser(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -433,7 +432,7 @@ func TestServer_CreateUser(t *testing.T) { // Ensure the server correctly detects when there is an admin user. func TestServer_AdminUserExists(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -464,7 +463,7 @@ func TestServer_AdminUserExists(t *testing.T) { // Ensure the server returns an error when creating an user without a name. func TestServer_CreateUser_ErrUsernameRequired(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -475,7 +474,7 @@ func TestServer_CreateUser_ErrUsernameRequired(t *testing.T) { // Ensure the server returns an error when creating a duplicate user. func TestServer_CreateUser_ErrUserExists(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -489,7 +488,7 @@ func TestServer_CreateUser_ErrUserExists(t *testing.T) { // Ensure the server can delete an existing user. func TestServer_DeleteUser(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -516,7 +515,7 @@ func TestServer_DeleteUser(t *testing.T) { // Ensure the server can return a list of all users. func TestServer_Users(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -538,7 +537,7 @@ func TestServer_Users(t *testing.T) { // Ensure the server does not return non-existent users func TestServer_NonExistingUsers(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -561,7 +560,7 @@ func TestServer_NonExistingUsers(t *testing.T) { // Ensure the database can create a new retention policy. func TestServer_CreateRetentionPolicy(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -594,7 +593,7 @@ func TestServer_CreateRetentionPolicy(t *testing.T) { // Ensure the server returns an error when creating a retention policy with an invalid db. func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -605,7 +604,7 @@ func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { // Ensure the server returns an error when creating a retention policy without a name. func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -617,7 +616,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing. // Ensure the server returns an error when creating a duplicate retention policy. func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -630,7 +629,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) { // Ensure the database can alter an existing retention policy. func TestServer_AlterRetentionPolicy(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -696,7 +695,7 @@ func TestServer_AlterRetentionPolicy(t *testing.T) { // Ensure the server can delete an existing retention policy. func TestServer_DeleteRetentionPolicy(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -724,7 +723,7 @@ func TestServer_DeleteRetentionPolicy(t *testing.T) { // Ensure the server returns an error when deleting a retention policy on invalid db. func TestServer_DeleteRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -735,7 +734,7 @@ func TestServer_DeleteRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { // Ensure the server returns an error when deleting a retention policy without a name. func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -747,7 +746,7 @@ func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing. // Ensure the server returns an error when deleting a non-existent retention policy. func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -759,7 +758,7 @@ func TestServer_DeleteRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) { // Ensure the server can set the default retention policy func TestServer_SetDefaultRetentionPolicy(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -794,7 +793,7 @@ func TestServer_SetDefaultRetentionPolicy(t *testing.T) { // Ensure the server returns an error when setting the default retention policy to a non-existant one. func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -806,7 +805,7 @@ func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing. // Ensure the server prohibits a zero check interval for retention policy enforcement. func TestServer_StartRetentionPolicyEnforcement_ErrZeroInterval(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -816,7 +815,7 @@ func TestServer_StartRetentionPolicyEnforcement_ErrZeroInterval(t *testing.T) { } func TestServer_EnforceRetentionPolices(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -853,7 +852,7 @@ func TestServer_EnforceRetentionPolices(t *testing.T) { // Ensure the database can write data to the database. func TestServer_WriteSeries(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -867,7 +866,6 @@ func TestServer_WriteSeries(t *testing.T) { t.Fatal(err) } c.Sync(index) - warn("A") // Write another point 10 seconds later so it goes through "raw series". index, err = s.WriteSeries("foo", "mypolicy", []influxdb.Point{{Name: "cpu_load", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:10Z"), Fields: map[string]interface{}{"value": float64(100)}}}) @@ -875,7 +873,6 @@ func TestServer_WriteSeries(t *testing.T) { t.Fatal(err) } c.Sync(index) - warn("B") // Retrieve first series data point. if v, err := s.ReadSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:00Z")); err != nil { @@ -901,7 +898,7 @@ func TestServer_WriteSeries(t *testing.T) { // Ensure the server can drop a measurement. func TestServer_DropMeasurement(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -964,7 +961,7 @@ func TestServer_DropMeasurement(t *testing.T) { // Ensure the server can handles drop measurement if none exists. func TestServer_DropMeasurementNoneExists(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -1004,7 +1001,7 @@ func TestServer_DropMeasurementNoneExists(t *testing.T) { // select * from memory where host=serverb // select * from memory where region=uswest func TestServer_DropMeasurementSeriesTagsPreserved(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -1118,7 +1115,7 @@ func TestServer_DropMeasurementSeriesTagsPreserved(t *testing.T) { // Ensure the server can drop a series. func TestServer_DropSeries(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -1162,7 +1159,7 @@ func TestServer_DropSeries(t *testing.T) { // Ensure the server can drop a series from measurement when more than one shard exists. func TestServer_DropSeriesFromMeasurement(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -1204,7 +1201,7 @@ func TestServer_DropSeriesFromMeasurement(t *testing.T) { // Ensure that when merging many series together and some of them have a different number of points than others // in a group by interval the results are correct func TestServer_MergeManySeries(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -1239,7 +1236,7 @@ func TestServer_MergeManySeries(t *testing.T) { // ensure that the dropped series is gone // ensure that we can still query: select value from cpu where region=uswest func TestServer_DropSeriesTagsPreserved(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") @@ -1313,7 +1310,7 @@ func TestServer_DropSeriesTagsPreserved(t *testing.T) { // Ensure the server can execute a query and return the data correctly. func TestServer_ExecuteQuery(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1393,7 +1390,7 @@ func TestServer_ExecuteQuery(t *testing.T) { // Ensure the server respects limit and offset in show series queries func TestServer_ShowSeriesLimitOffset(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1449,7 +1446,7 @@ func TestServer_ShowSeriesLimitOffset(t *testing.T) { // Ensure that when querying for raw data values that they return in time order func TestServer_RawDataReturnsInOrder(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1496,7 +1493,7 @@ func TestServer_RawDataReturnsInOrder(t *testing.T) { // Ensure that limit and offset work func TestServer_LimitAndOffset(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1545,7 +1542,7 @@ func TestServer_LimitAndOffset(t *testing.T) { // Ensure the server can execute a wildcard query and return the data correctly. func TestServer_ExecuteWildcardQuery(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1571,7 +1568,7 @@ func TestServer_ExecuteWildcardQuery(t *testing.T) { // Ensure the server can execute a wildcard GROUP BY func TestServer_ExecuteWildcardGroupBy(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1604,7 +1601,7 @@ func TestServer_ExecuteWildcardGroupBy(t *testing.T) { } func TestServer_CreateShardGroupIfNotExist(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1629,7 +1626,7 @@ func TestServer_CreateShardGroupIfNotExist(t *testing.T) { } func TestServer_DeleteShardGroup(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1667,7 +1664,7 @@ func TestServer_DeleteShardGroup(t *testing.T) { /* TODO(benbjohnson): Change test to not expose underlying series ids directly. func TestServer_Measurements(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1738,7 +1735,7 @@ func TestServer_NormalizeMeasurement(t *testing.T) { } // Create server with a variety of databases, retention policies, and measurements - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1787,7 +1784,7 @@ func TestServer_NormalizeQuery(t *testing.T) { } // Start server with database & retention policy. - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1809,7 +1806,7 @@ func TestServer_NormalizeQuery(t *testing.T) { // Ensure the server can create a continuous query func TestServer_CreateContinuousQuery(t *testing.T) { - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -1871,7 +1868,7 @@ func TestServer_CreateContinuousQuery_ErrInfinteLoop(t *testing.T) { // Ensure func TestServer_RunContinuousQueries(t *testing.T) { t.Skip() - c := NewMessagingClient() + c := test.NewMessagingClient() defer c.Close() s := OpenServer(c) defer s.Close() @@ -2019,7 +2016,7 @@ func NewServer() *Server { // OpenServer returns a new, open test server instance. func OpenServer(client influxdb.MessagingClient) *Server { s := OpenUninitializedServer(client) - if err := s.Initialize(&url.URL{Host: "127.0.0.1:8080"}); err != nil { + if err := s.Initialize(url.URL{Host: "127.0.0.1:8080"}); err != nil { panic(err.Error()) } return s @@ -2081,147 +2078,10 @@ func (s *Server) MustWriteSeries(database, retentionPolicy string, points []infl if err != nil { panic(err.Error()) } - s.Client().(*MessagingClient).Sync(index) + s.Client().(*test.MessagingClient).Sync(index) return index } -// MessagingClient represents a test client for the messaging broker. -type MessagingClient struct { - mu sync.Mutex - index uint64 // highest index - conns []*MessagingConn // list of all connections - - messagesByTopicID map[uint64][]*messaging.Message // message by topic - - PublishFunc func(*messaging.Message) (uint64, error) - ConnFunc func(topicID uint64) influxdb.MessagingConn -} - -// NewMessagingClient returns a new instance of MessagingClient. -func NewMessagingClient() *MessagingClient { - c := &MessagingClient{ - messagesByTopicID: make(map[uint64][]*messaging.Message), - } - c.PublishFunc = c.DefaultPublishFunc - c.ConnFunc = c.DefaultConnFunc - return c -} - -// Close closes all open connections. -func (c *MessagingClient) Close() error { - c.mu.Lock() - defer c.mu.Unlock() - - for _, conn := range c.conns { - conn.Close() - } - - return nil -} - -func (c *MessagingClient) Publish(m *messaging.Message) (uint64, error) { return c.PublishFunc(m) } - -// DefaultPublishFunc sets an autoincrementing index on the message and sends it to each topic connection. -func (c *MessagingClient) DefaultPublishFunc(m *messaging.Message) (uint64, error) { - c.mu.Lock() - defer c.mu.Unlock() - - // Increment index and assign it to message. - c.index++ - m.Index = c.index - - // Append message to the topic. - c.messagesByTopicID[m.TopicID] = append(c.messagesByTopicID[m.TopicID], m) - - // Send to each connection for the topic. - for _, conn := range c.conns { - if conn.topicID == m.TopicID { - conn.Send(m) - } - } - - return m.Index, nil -} - -func (c *MessagingClient) Conn(topicID uint64) influxdb.MessagingConn { - return c.ConnFunc(topicID) -} - -// DefaultConnFunc returns a connection for a specific topic. -func (c *MessagingClient) DefaultConnFunc(topicID uint64) influxdb.MessagingConn { - c.mu.Lock() - defer c.mu.Unlock() - - // Create new connection. - conn := NewMessagingConn(topicID) - - // Track connections. - c.conns = append(c.conns, conn) - - return conn -} - -// Sync blocks until a given index has been sent through the client. -func (c *MessagingClient) Sync(index uint64) { - for { - c.mu.Lock() - if c.index >= index { - c.mu.Unlock() - time.Sleep(10 * time.Millisecond) - return - } - c.mu.Unlock() - - // Otherwise wait momentarily and check again. - time.Sleep(1 * time.Millisecond) - } -} - -// MessagingConn represents a mockable connection implementing influxdb.MessagingConn. -type MessagingConn struct { - mu sync.Mutex - topicID uint64 - index uint64 - c chan *messaging.Message -} - -// NewMessagingConn returns a new instance of MessagingConn. -func NewMessagingConn(topicID uint64) *MessagingConn { - return &MessagingConn{ - topicID: topicID, - } -} - -// Open starts the stream from a given index. -func (c *MessagingConn) Open(index uint64) error { - // TODO: Fill connection stream with existing messages. - c.c = make(chan *messaging.Message, 1024) - return nil -} - -// Close closes the streaming channel. -func (c *MessagingConn) Close() error { - close(c.c) - return nil -} - -// C returns a channel for streaming message. -func (c *MessagingConn) C() <-chan *messaging.Message { return c.c } - -func (c *MessagingConn) Send(m *messaging.Message) { - // Ignore any old messages. - c.mu.Lock() - if m.Index <= c.index { - c.mu.Unlock() - return - } - c.index = m.Index - c.mu.Unlock() - - // Send message to channel. - c.c <- m -} - // tempfile returns a temporary path. func tempfile() string { f, _ := ioutil.TempFile("", "influxdb-") diff --git a/shard.go b/shard.go index a8dcd459ce..fa6635e2fa 100644 --- a/shard.go +++ b/shard.go @@ -100,7 +100,7 @@ func (s *Shard) open(path string, conn MessagingConn) error { } // Open connection. - if err := conn.Open(s.index); err != nil { + if err := conn.Open(s.index, true); err != nil { _ = s.close() return fmt.Errorf("open shard conn: id=%d, idx=%d, err=%s", s.ID, s.index, err) } diff --git a/tx_test.go b/tx_test.go index 4d9815a8c8..12b8d0d680 100644 --- a/tx_test.go +++ b/tx_test.go @@ -7,12 +7,15 @@ import ( "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/influxql" + "github.com/influxdb/influxdb/test" ) // Ensure a transaction can retrieve a list of iterators for a simple SELECT statement. func TestTx_CreateIterators(t *testing.T) { t.Skip() - s := OpenDefaultServer(NewMessagingClient()) + c := test.NewMessagingClient() + defer c.Close() + s := OpenDefaultServer(c) defer s.Close() // Write to us-east