diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 8b87d5416b..bb01e416a3 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -300,10 +300,20 @@ func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, conf // Create messaging client to the brokers. c := influxdb.NewMessagingClient() c.SetLogOutput(w) - if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile), clientJoinURLs); err != nil { + if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile)); err != nil { log.Fatalf("messaging client error: %s", err) } + // If join URLs were passed in then use them to override the client's URLs. + if len(clientJoinURLs) > 0 { + c.SetURLs(clientJoinURLs) + } + + // If no URLs exist on the client the return an error since we cannot reach a broker. + if len(c.URLs()) == 0 { + log.Fatal("messaging client has no broker URLs") + } + // Create and open the server. s := influxdb.NewServer() s.SetLogOutput(w) diff --git a/messaging/broker.go b/messaging/broker.go index 14f2abf40b..e584d53a1d 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -36,7 +36,9 @@ type Broker struct { // Log is the distributed raft log that commands are applied to. Log interface { URL() url.URL + URLs() []url.URL Leader() (uint64, url.URL) + IsLeader() bool ClusterID() uint64 Apply(data []byte) (index uint64, err error) } @@ -68,6 +70,12 @@ func (b *Broker) metaPath() string { // URL returns the URL of the broker. func (b *Broker) URL() url.URL { return b.Log.URL() } +// URLs returns a list of all broker URLs in the cluster. +func (b *Broker) URLs() []url.URL { return b.Log.URLs() } + +// IsLeader returns true if the broker is the current cluster leader. +func (b *Broker) IsLeader() bool { return b.Log.IsLeader() } + // LeaderURL returns the URL to the leader broker. func (b *Broker) LeaderURL() url.URL { _, u := b.Log.Leader() diff --git a/messaging/broker_test.go b/messaging/broker_test.go index edb03eec89..80a15f97ff 100644 --- a/messaging/broker_test.go +++ b/messaging/broker_test.go @@ -707,14 +707,18 @@ func (b *Broker) MustReadAllTopic(topicID uint64) (a []*messaging.Message) { type BrokerLog struct { ApplyFunc func(data []byte) (uint64, error) ClusterIDFunc func() uint64 + IsLeaderFunc func() bool LeaderFunc func() (uint64, url.URL) URLFunc func() url.URL + URLsFunc func() []url.URL } func (l *BrokerLog) Apply(data []byte) (uint64, error) { return l.ApplyFunc(data) } func (l *BrokerLog) ClusterID() uint64 { return l.ClusterIDFunc() } +func (l *BrokerLog) IsLeader() bool { return l.IsLeaderFunc() } func (l *BrokerLog) Leader() (uint64, url.URL) { return l.LeaderFunc() } func (l *BrokerLog) URL() url.URL { return l.URLFunc() } +func (l *BrokerLog) URLs() []url.URL { return l.URLsFunc() } // Messages represents a collection of messages. // This type provides helper functions. diff --git a/messaging/client.go b/messaging/client.go index e04a5b4f3b..b9f64a2ff6 100644 --- a/messaging/client.go +++ b/messaging/client.go @@ -17,17 +17,20 @@ import ( "time" ) -// DefaultReconnectTimeout is the default time to wait between when a broker -// stream disconnects and another connection is retried. -const DefaultReconnectTimeout = 1000 * time.Millisecond +const ( + // DefaultReconnectTimeout is the default time to wait between when a broker + // stream disconnects and another connection is retried. + DefaultReconnectTimeout = 1000 * time.Millisecond -// DefaultPingInterval is the default time to wait between checks to the broker. -const DefaultPingInterval = 1000 * time.Millisecond + // DefaultPingInterval is the default time to wait between checks to the broker. + DefaultPingInterval = 1000 * time.Millisecond +) // Client represents a client for the broker's HTTP API. type Client struct { mu sync.Mutex - conns []*Conn + path string // config file path + conns []*Conn // all connections opened by client url url.URL // current known leader URL urls []url.URL // list of available broker URLs @@ -80,10 +83,27 @@ func (c *Client) setURL(u url.URL) { } } -// RandomizeURL sets a random URL from the configuration. -func (c *Client) RandomizeURL() { +// URLs returns a list of possible broker URLs to connect to. +func (c *Client) URLs() []url.URL { c.mu.Lock() defer c.mu.Unlock() + return c.urls +} + +// SetURLs sets a list of possible URLs to connect to for the client and its connections. +func (c *Client) SetURLs(a []url.URL) { + c.mu.Lock() + defer c.mu.Unlock() + c.setURLs(a) +} + +func (c *Client) setURLs(a []url.URL) { + // Ignore if the URL list is the same. + if urlsEqual(c.urls, a) { + return + } + + c.urls = a c.randomizeURL() } @@ -102,8 +122,8 @@ func (c *Client) SetLogOutput(w io.Writer) { c.Logger = log.New(w, "[messaging] ", log.LstdFlags) } -// Open reads the configuration from the specified path or uses the URLs provided. -func (c *Client) Open(path string, urls []url.URL) error { +// Open opens the client and reads the configuration from the specified path. +func (c *Client) Open(path string) error { c.mu.Lock() defer c.mu.Unlock() @@ -113,29 +133,12 @@ func (c *Client) Open(path string, urls []url.URL) error { } // Read URLs from file if no URLs are provided. - if len(urls) == 0 { - if b, err := ioutil.ReadFile(path); os.IsNotExist(err) { - // nop - } else if err != nil { - return err - } else { - var config ClientConfig - if err := json.Unmarshal(b, &config); err != nil { - return err - } - c.urls = config.Brokers - } + c.path = path + if err := c.loadConfig(); err != nil { + _ = c.close() + return fmt.Errorf("load config: %s", err) } - // Ensure we have at least one URL. - if len(urls) < 1 { - return ErrBrokerURLRequired - } - - // Set the URLs whether they're from the config or passed in. - c.urls = urls - c.randomizeURL() - // Set open flag. c.opened = true @@ -151,7 +154,10 @@ func (c *Client) Open(path string, urls []url.URL) error { func (c *Client) Close() error { c.mu.Lock() defer c.mu.Unlock() + return c.close() +} +func (c *Client) close() error { // Return error if the client is already closed. if !c.opened { return ErrClientClosed @@ -180,6 +186,53 @@ func (c *Client) Close() error { return nil } +// loadConfig reads the configuration from disk and sets the options on the client. +func (c *Client) loadConfig() error { + // Open config file for reading. + f, err := os.Open(c.path) + if os.IsNotExist(err) { + c.urls = nil + return nil + } else if err != nil { + return fmt.Errorf("open config: %s", err) + } + defer f.Close() + + // Decode config from file. + var config ClientConfig + if err := json.NewDecoder(f).Decode(&config); err != nil { + return fmt.Errorf("decode config: %s", err) + } + + // Set options. + c.urls = config.URLs + + return nil +} + +// setConfig writes a new config to disk and updates urls on the client. +func (c *Client) setConfig(config ClientConfig) error { + // Only write to disk if we have a path. + if c.path != "" { + // Open config file for writing. + f, err := os.Create(c.path) + if err != nil { + return fmt.Errorf("create: %s", err) + } + defer f.Close() + + // Encode config to file. + if err := json.NewEncoder(f).Encode(&config); err != nil { + return fmt.Errorf("encode config: %s", err) + } + } + + // Set options. + c.urls = config.URLs + + return nil +} + // Publish sends a message to the broker and returns an index or error. func (c *Client) Publish(m *Message) (uint64, error) { // Post message to broker. @@ -198,7 +251,7 @@ func (c *Client) Publish(m *Message) (uint64, error) { if errstr := resp.Header.Get("X-Broker-Error"); errstr != "" { return 0, errors.New(errstr) } - return 0, fmt.Errorf("cannot publish(%d)", resp.StatusCode) + return 0, fmt.Errorf("cannot publish: status=%d", resp.StatusCode) } // Parse broker index. @@ -218,7 +271,26 @@ func (c *Client) Ping() error { if err != nil { return err } - resp.Body.Close() + defer resp.Body.Close() + + // Read entire body. + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("read ping body: %s", err) + } + + // Update config if body is passed back. + if len(b) != 0 { + var config ClientConfig + if err := json.Unmarshal(b, &config); err != nil { + return fmt.Errorf("unmarshal config: %s", err) + } + + if err := c.setConfig(config); err != nil { + return fmt.Errorf("update config: %s", err) + } + } + return nil } @@ -293,15 +365,39 @@ func (c *Client) pinger(closing chan struct{}) { // ClientConfig represents the configuration that must be persisted across restarts. type ClientConfig struct { - Brokers []url.URL `json:"brokers"` - Leader url.URL `json:"leader"` + URLs []url.URL } -// NewClientConfig returns a new instance of ClientConfig. -func NewClientConfig(u []url.URL) *ClientConfig { - return &ClientConfig{ - Brokers: u, +func (c ClientConfig) MarshalJSON() ([]byte, error) { + var other clientConfigJSON + other.URLs = make([]string, len(c.URLs)) + for i, u := range c.URLs { + other.URLs[i] = u.String() } + return json.Marshal(&other) +} + +func (c *ClientConfig) UnmarshalJSON(b []byte) error { + var other clientConfigJSON + if err := json.Unmarshal(b, &other); err != nil { + return err + } + + c.URLs = make([]url.URL, len(other.URLs)) + for i := range other.URLs { + u, err := url.Parse(other.URLs[i]) + if err != nil { + return err + } + c.URLs[i] = *u + } + + return nil +} + +// clientConfigJSON represents the JSON +type clientConfigJSON struct { + URLs []string `json:"urls"` } // Conn represents a stream over the client for a single topic. @@ -465,7 +561,7 @@ func (c *Conn) Heartbeat() error { if errstr := resp.Header.Get("X-Broker-Error"); errstr != "" { return errors.New(errstr) } - return fmt.Errorf("heartbeat error: %d", resp.StatusCode) + return fmt.Errorf("heartbeat error: status=%d", resp.StatusCode) } return nil } @@ -563,3 +659,16 @@ func (c *Conn) stream(req *http.Request, closing <-chan struct{}) error { } } } + +// urlsEqual returns true if a and b contain the same URLs in the same order. +func urlsEqual(a, b []url.URL) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/messaging/client_test.go b/messaging/client_test.go index 613fc76db3..8feadacc21 100644 --- a/messaging/client_test.go +++ b/messaging/client_test.go @@ -1,16 +1,323 @@ package messaging_test import ( + "encoding/json" "io/ioutil" "net/http" "net/http/httptest" "net/url" + "os" "reflect" + "strings" "testing" "github.com/influxdb/influxdb/messaging" ) +// Ensure a client can open the configuration file, if it exists. +func TestClient_Open_WithConfig(t *testing.T) { + // Write configuration file. + path := NewTempFile() + defer os.Remove(path) + MustWriteFile(path, []byte(`{"urls":["//hostA"]}`)) + + // Open new client against path. + c := NewClient() + if err := c.Open(path); err != nil { + t.Fatalf("unexpected error: %s", err) + } + defer c.Close() + + // Verify that urls were populated. + if a := c.URLs(); !reflect.DeepEqual(a, []url.URL{{Host: "hostA"}}) { + t.Fatalf("unexpected urls: %#v", a) + } +} + +// Ensure a client will ignore non-existent a config file. +func TestClient_Open_WithMissingConfig(t *testing.T) { + path := NewTempFile() + c := NewClient() + c.SetURLs([]url.URL{{Host: "//hostA"}}) + if err := c.Open(path); err != nil { + t.Fatalf("unexpected error: %s", err) + } + defer c.Close() + + // Verify that urls were cleared. + if a := c.URLs(); len(a) != 0 { + t.Fatalf("unexpected urls: %#v", a) + } +} + +// Ensure a client can return an error if the configuration file is corrupt. +func TestClient_Open_WithInvalidConfig(t *testing.T) { + // Write bad configuration file. + path := NewTempFile() + defer os.Remove(path) + MustWriteFile(path, []byte(`{"urls":`)) + + // Open new client against path. + c := NewClient() + if err := c.Open(path); err == nil || err.Error() != `load config: decode config: unexpected EOF` { + t.Fatalf("unexpected error: %s", err) + } + defer c.Close() +} + +// Ensure a client can return an error if the configuration file has non-readable permissions. +func TestClient_Open_WithBadPermConfig(t *testing.T) { + // Write inaccessible configuration file. + path := NewTempFile() + defer os.Remove(path) + MustWriteFile(path, []byte(`{"urls":["//hostA"]}`)) + os.Chmod(path, 0000) + + // Open new client against path. + c := NewClient() + if err := c.Open(path); err == nil || !strings.Contains(err.Error(), `permission denied`) { + t.Fatalf("unexpected error: %s", err) + } + defer c.Close() +} + +// Ensure a client returns an error when reopening. +func TestClient_Open_ErrClientOpen(t *testing.T) { + c := NewClient() + c.Open("") + defer c.Close() + if err := c.Open(""); err != messaging.ErrClientOpen { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure the URL on a client can be set and retrieved. +func TestClient_SetURL(t *testing.T) { + c := NewClient() + defer c.Close() + + c.SetURL(url.URL{Host: "localhost"}) + if u := c.URL(); u != (url.URL{Host: "localhost"}) { + t.Fatalf("unexpected url: %s", u) + } +} + +// Ensure a client will update its connection urls. +func TestClient_SetURL_UpdateConn(t *testing.T) { + c := NewClient() + c.MustOpen("") + c.SetURLs([]url.URL{{Host: "hostA"}}) + defer c.Close() + + // Create connection & check URL. + conn := c.Conn(0) + if u := conn.URL(); u != (url.URL{Host: "hostA"}) { + t.Fatalf("unexpected initial connection url: %s", u) + } + + // Update client url. + c.SetURL(url.URL{Host: "hostB"}) + + // Check that connection url was updated. + if u := conn.URL(); u != (url.URL{Host: "hostB"}) { + t.Fatalf("unexpected new connection url: %s", u) + } +} + +// Ensure a set of URLs can be set on the client and retrieved. +// One of those URLs should be randomly set as the current URL. +func TestClient_SetURLs(t *testing.T) { + c := NewClient() + defer c.Close() + + // Set and retrieve URLs. + c.SetURLs([]url.URL{{Host: "hostA"}, {Host: "hostB"}}) + if a := c.URLs(); a[0] != (url.URL{Host: "hostA"}) { + t.Fatalf("unexpected urls length: %d", len(a)) + } else if a := c.URLs(); a[0] != (url.URL{Host: "hostA"}) { + t.Fatalf("unexpected url(0): %s", a[0]) + } else if a := c.URLs(); a[1] != (url.URL{Host: "hostB"}) { + t.Fatalf("unexpected url(1): %s", a[1]) + } + + // Current URL should be one of the URLs set. + if u := c.URL(); u != (url.URL{Host: "hostA"}) && u != (url.URL{Host: "hostB"}) { + t.Fatalf("unexpected url: %s", u) + } +} + +// Ensure that an empty set of URLs can be set to the client. +func TestClient_SetURLs_NoURLs(t *testing.T) { + c := NewClient() + defer c.Close() + c.SetURLs([]url.URL{}) +} + +// Ensure a client can publish a message to the broker. +func TestClient_Publish(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if req.URL.Path != "/messaging/messages" { + t.Fatalf("unexpected path: %s", req.URL.Path) + } else if req.Method != "POST" { + t.Fatalf("unexpected method: %s", req.Method) + } else if typ := req.URL.Query().Get("type"); typ != "1" { + t.Fatalf("unexpected type: %s", typ) + } else if topicID := req.URL.Query().Get("topicID"); topicID != "2" { + t.Fatalf("unexpected topicID: %s", topicID) + } + + w.Header().Set("X-Broker-Index", "200") + })) + defer s.Close() + + // Create client. + c := NewClient() + c.MustOpen("") + c.SetURL(*MustParseURL(s.URL)) + defer c.Close() + + // Publish message to server. + if index, err := c.Publish(&messaging.Message{Type: 1, TopicID: 2, Data: []byte{0, 0, 0, 0}}); err != nil { + t.Fatal(err) + } else if index != 200 { + t.Fatalf("unexpected index: %d", index) + } +} + +// Ensure a client can redirect a published a message to another broker. +func TestClient_Publish_Redirect(t *testing.T) { + // Create a server to receive redirection. + s0 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if req.URL.Path != "/messaging/messages" { + t.Fatalf("unexpected path: %s", req.URL.Path) + } else if req.Method != "POST" { + t.Fatalf("unexpected method: %s", req.Method) + } else if typ := req.URL.Query().Get("type"); typ != "1" { + t.Fatalf("unexpected type: %s", typ) + } else if topicID := req.URL.Query().Get("topicID"); topicID != "2" { + t.Fatalf("unexpected topicID: %s", topicID) + } + + w.Header().Set("X-Broker-Index", "200") + })) + defer s0.Close() + + // Create another server to redirect to the first one. + s1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + http.Redirect(w, req, s0.URL+req.URL.Path, http.StatusTemporaryRedirect) + })) + defer s1.Close() + + // Create client. + c := NewClient() + c.MustOpen("") + c.SetURL(*MustParseURL(s1.URL)) + defer c.Close() + + // Publish message to server. + if index, err := c.Publish(&messaging.Message{Type: 1, TopicID: 2, Data: []byte{0, 0, 0, 0}}); err != nil { + t.Fatal(err) + } else if index != 200 { + t.Fatalf("unexpected index: %d", index) + } +} + +// Ensure a client returns an error if the responses Location header is invalid. +func TestClient_Publish_Redirect_ErrInvalidLocation(t *testing.T) { + // Create another server to redirect to the first one. + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + http.Redirect(w, req, "http://%f", http.StatusTemporaryRedirect) + })) + defer s.Close() + + // Create client. + c := NewClient() + c.MustOpen("") + c.SetURL(*MustParseURL(s.URL)) + defer c.Close() + + // Publish message to server. + if _, err := c.Publish(&messaging.Message{}); err == nil || err.Error() != `do: invalid redirect location: http://%f` { + t.Fatal(err) + } +} + +// Ensure a client returns an error publishing to a down broker. +func TestClient_Publish_ErrConnectionRefused(t *testing.T) { + s := httptest.NewServer(nil) + s.Close() + + // Create client. + c := NewClient() + c.MustOpen("") + c.SetURL(*MustParseURL(s.URL)) + defer c.Close() + + // Publish message to server. + if _, err := c.Publish(&messaging.Message{}); err == nil || !strings.Contains(err.Error(), `connection refused`) { + t.Fatal(err) + } +} + +// Ensure a client returns an error if returned by the server. +func TestClient_Publish_ErrBrokerError(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Header().Set("X-Broker-Error", "oh no") + w.WriteHeader(http.StatusInternalServerError) + })) + defer s.Close() + + // Create client. + c := NewClient() + c.MustOpen("") + c.SetURL(*MustParseURL(s.URL)) + defer c.Close() + + // Publish message to server. + if _, err := c.Publish(&messaging.Message{}); err == nil || err.Error() != `oh no` { + t.Fatal(err) + } +} + +// Ensure a client returns an error if a non-broker error occurs. +func TestClient_Publish_ErrHTTPError(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer s.Close() + + // Create client. + c := NewClient() + c.MustOpen("") + c.SetURL(*MustParseURL(s.URL)) + defer c.Close() + + // Publish message to server. + if _, err := c.Publish(&messaging.Message{}); err == nil || err.Error() != `cannot publish: status=500` { + t.Fatal(err) + } +} + +// Ensure a client returns an error if the returned index is invalid. +func TestClient_Publish_ErrInvalidIndex(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Header().Set("X-Broker-Index", "xxx") + + })) + defer s.Close() + + // Create client. + c := NewClient() + c.MustOpen("") + c.SetURL(*MustParseURL(s.URL)) + defer c.Close() + + // Publish message to server. + if _, err := c.Publish(&messaging.Message{}); err == nil || err.Error() != `invalid index: strconv.ParseUint: parsing "xxx": invalid syntax` { + t.Fatal(err) + } +} + // Ensure a client can check if the server is alive. func TestClient_Ping(t *testing.T) { var pinged bool @@ -23,10 +330,9 @@ func TestClient_Ping(t *testing.T) { defer s.Close() // Create client. - c := messaging.NewClient() - if err := c.Open("", []url.URL{*MustParseURL(s.URL)}); err != nil { - t.Fatal(err) - } + c := NewClient() + c.MustOpen("") + c.SetURLs([]url.URL{*MustParseURL(s.URL)}) defer c.Close() // Ping server. @@ -37,6 +343,97 @@ func TestClient_Ping(t *testing.T) { } } +// Ensure a client returns an error if the ping cannot connect to the server. +func TestClient_Ping_ErrConnectionRefused(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {})) + s.Close() + + // Create client. + c := NewClient() + c.MustOpen("") + c.SetURLs([]url.URL{*MustParseURL(s.URL)}) + defer c.Close() + + // Ping server. + if err := c.Ping(); err == nil || !strings.Contains(err.Error(), `connection refused`) { + t.Fatal(err) + } +} + +// Ensure a client returns an error if the body of the response cannot be read. +func TestClient_Ping_ErrRead(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Length", "10") + w.Write(make([]byte, 9)) + })) + defer s.Close() + + // Create client. + c := NewClient() + c.MustOpen("") + c.SetURLs([]url.URL{*MustParseURL(s.URL)}) + defer c.Close() + + // Ping server. + if err := c.Ping(); err == nil || err.Error() != `read ping body: unexpected EOF` { + t.Fatal(err) + } +} + +// Ensure a client can receive config data from the broker on ping. +func TestClient_Ping_ReceiveConfig(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Write([]byte(`{"urls":["//local.dev"]}`)) + })) + defer s.Close() + + // Create a temp file for configuration. + path := NewTempFile() + defer os.Remove(path) + + // Create client. + c := NewClient() + c.MustOpen(path) + c.SetURLs([]url.URL{*MustParseURL(s.URL)}) + defer c.Close() + + // Ping server. + if err := c.Ping(); err != nil { + t.Fatal(err) + } + + // Confirm config change. + if a := c.URLs(); len(a) != 1 { + t.Fatalf("unexpected urls length: %d", len(a)) + } else if a[0] != (url.URL{Host: "local.dev"}) { + t.Fatalf("unexpected url(0): %s", a[0]) + } + + // Confirm config was rewritten. + if b, _ := ioutil.ReadFile(path); string(b) != `{"urls":["//local.dev"]}`+"\n" { + t.Fatalf("unexpected config file: %s", b) + } +} + +// Ensure a client returns an error when ping response is invalid. +func TestClient_Ping_ErrInvalidResponse(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Write([]byte(`{"urls":`)) + })) + defer s.Close() + + // Create client. + c := NewClient() + c.MustOpen("") + c.SetURLs([]url.URL{*MustParseURL(s.URL)}) + defer c.Close() + + // Ping server. + if err := c.Ping(); err == nil || err.Error() != `unmarshal config: unexpected end of JSON input` { + t.Fatal(err) + } +} + // Ensure a client can be opened and connections can be created. func TestClient_Conn(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -50,15 +447,16 @@ func TestClient_Conn(t *testing.T) { defer s.Close() // Create and open connection to server. - c := messaging.NewClient() - if err := c.Open("", []url.URL{*MustParseURL(s.URL)}); err != nil { - t.Fatal(err) - } + c := NewClient() + c.MustOpen("") + c.SetURLs([]url.URL{*MustParseURL(s.URL)}) // Connect on topic #1. conn1 := c.Conn(1) if err := conn1.Open(0, false); err != nil { t.Fatal(err) + } else if conn1.TopicID() != 1 { + t.Fatalf("unexpected topic id(1): %d", conn1.TopicID()) } else if m := <-conn1.C(); !reflect.DeepEqual(m, &messaging.Message{Index: 1, Data: []byte{100}}) { t.Fatalf("unexpected message(1): %#v", m) } @@ -205,21 +603,122 @@ func TestConn_Heartbeat(t *testing.T) { } } -// Client represents a test wrapper for the broker client. +// Ensure that a connection returns an error if it cannot connect to the broker. +func TestConn_Heartbeat_ErrConnectionRefused(t *testing.T) { + s := httptest.NewServer(nil) + s.Close() + + // Create connection and heartbeat. + c := messaging.NewConn(0) + c.SetURL(*MustParseURL(s.URL)) + if err := c.Heartbeat(); err == nil || !strings.Contains(err.Error(), `connection refused`) { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure that a connection returns an error if the heartbeat is redirected. +// This occurs when the broker is not the leader. The client will update the URL later. +func TestConn_Heartbeat_ErrNoLeader(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusTemporaryRedirect) + })) + defer s.Close() + + // Create connection and heartbeat. + c := messaging.NewConn(0) + c.SetURL(*MustParseURL(s.URL)) + if err := c.Heartbeat(); err != messaging.ErrNoLeader { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure that a connection returns a broker error while heartbeating. +func TestConn_Heartbeat_ErrBrokerError(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Header().Set("X-Broker-Error", "oh no") + w.WriteHeader(http.StatusInternalServerError) + })) + defer s.Close() + + // Create connection and heartbeat. + c := messaging.NewConn(0) + c.SetURL(*MustParseURL(s.URL)) + if err := c.Heartbeat(); err == nil || err.Error() != `oh no` { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure that a connection returns an http error while heartbeating. +func TestConn_Heartbeat_ErrHTTPError(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer s.Close() + + // Create connection and heartbeat. + c := messaging.NewConn(0) + c.SetURL(*MustParseURL(s.URL)) + if err := c.Heartbeat(); err == nil || err.Error() != `heartbeat error: status=500` { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure that the client config can be serialized to JSON. +func TestClientConfig_MarshalJSON(t *testing.T) { + c := messaging.ClientConfig{URLs: []url.URL{{Host: "hostA"}, {Host: "hostB"}}} + if b, err := json.Marshal(&c); err != nil { + t.Fatal(err) + } else if string(b) != `{"urls":["//hostA","//hostB"]}` { + t.Fatalf("unexpected json: %s", b) + } +} + +// Ensure that the client config can be deserialized from JSON. +func TestClientConfig_UnmarshalJSON(t *testing.T) { + var c messaging.ClientConfig + if err := json.Unmarshal([]byte(`{"urls":["//hostA","//hostB"]}`), &c); err != nil { + t.Fatal(err) + } + if len(c.URLs) != 2 { + t.Fatalf("unexpected url count: %d", len(c.URLs)) + } else if c.URLs[0] != (url.URL{Host: "hostA"}) { + t.Fatalf("unexpected url(0): %s", c.URLs[0]) + } else if c.URLs[1] != (url.URL{Host: "hostB"}) { + t.Fatalf("unexpected url(1): %s", c.URLs[1]) + } +} + +// Ensure that the client config returns an error when handling an invalid field type. +func TestClientConfig_UnmarshalJSON_ErrInvalidType(t *testing.T) { + var c messaging.ClientConfig + if err := json.Unmarshal([]byte(`{"urls":0}`), &c); err == nil || err.Error() != `json: cannot unmarshal number into Go value of type []string` { + t.Fatalf("unexpected error: %s", err) + } +} + +// Ensure that the client config returns an error when handling an invalid url. +func TestClientConfig_UnmarshalJSON_ErrInvalidURL(t *testing.T) { + var c messaging.ClientConfig + if err := json.Unmarshal([]byte(`{"urls":["http://%foo"]}`), &c); err == nil || err.Error() != `parse http://%foo: hexadecimal escape in host` { + t.Fatalf("unexpected error: %s", err) + } +} + +// Client represents a test wrapper for messaging.Client. type Client struct { *messaging.Client } -// NewClient returns a new instance of Client. -func NewClient(replicaID uint64) *Client { - return &Client{ - Client: messaging.NewClient(), - } +// NewClient returns an new instance of Client. +func NewClient() *Client { + return &Client{messaging.NewClient()} } -// Close shuts down the client and server. -func (c *Client) Close() { - c.Client.Close() +// MustOpen opens the client. Panic on error. +func (c *Client) MustOpen(path string) { + if err := c.Open(path); err != nil { + panic(err.Error()) + } } // NewTempFile returns the path of a new temporary file. @@ -229,6 +728,7 @@ func NewTempFile() string { if err != nil { panic(err) } - defer f.Close() + f.Close() + os.Remove(f.Name()) return f.Name() } diff --git a/messaging/errors.go b/messaging/errors.go index 35462b99e1..ce87031719 100644 --- a/messaging/errors.go +++ b/messaging/errors.go @@ -46,9 +46,6 @@ var ( // ErrConnCannotReuse is returned when opening a previously closed connection. ErrConnCannotReuse = errors.New("cannot reuse connection") - // ErrBrokerURLRequired is returned when opening a broker without URLs. - ErrBrokerURLRequired = errors.New("broker url required") - // ErrMessageTypeRequired is returned publishing a message without a type. ErrMessageTypeRequired = errors.New("message type required") diff --git a/messaging/handler.go b/messaging/handler.go index 51a1492356..f7318c71cf 100644 --- a/messaging/handler.go +++ b/messaging/handler.go @@ -1,6 +1,7 @@ package messaging import ( + "encoding/json" "io" "io/ioutil" "log" @@ -15,6 +16,8 @@ import ( // Handler represents an HTTP handler by the broker. type Handler struct { Broker interface { + URLs() []url.URL + IsLeader() bool LeaderURL() url.URL TopicReader(topicID, index uint64, streaming bool) io.ReadCloser Publish(m *Message) (uint64, error) @@ -169,6 +172,20 @@ func (h *Handler) postHeartbeat(w http.ResponseWriter, r *http.Request) { // servePing returns a status 200. func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) { + // Redirect if not leader. + if !h.Broker.IsLeader() { + h.redirectToLeader(w, r) + return + } + + // Write out client configuration. + var config ClientConfig + config.URLs = h.Broker.URLs() + if err := json.NewEncoder(w).Encode(&config); err != nil { + log.Printf("unable to write client config: %s", err) + return + } + w.WriteHeader(http.StatusOK) } diff --git a/messaging/handler_test.go b/messaging/handler_test.go index 01ee316d38..198be0dc9c 100644 --- a/messaging/handler_test.go +++ b/messaging/handler_test.go @@ -3,6 +3,7 @@ package messaging_test import ( "bytes" "io" + "io/ioutil" "net/http" "net/http/httptest" "net/url" @@ -195,19 +196,48 @@ func TestHandler_postHeartbeat_ErrMethodNotAllowed(t *testing.T) { resp.Body.Close() } -// Ensure a handler can respond to a ping. +// Ensure a handler can respond to a ping with the current cluster configuration. func TestHandler_servePing(t *testing.T) { - s := httptest.NewServer(&messaging.Handler{}) + var hb HandlerBroker + hb.IsLeaderFunc = func() bool { return true } + hb.URLsFunc = func() []url.URL { return []url.URL{{Host: "hostA"}, {Host: "hostB"}} } + s := httptest.NewServer(&messaging.Handler{Broker: &hb}) defer s.Close() // Send request to the broker. resp, err := http.Post(s.URL+`/messaging/ping`, "application/octet-stream", nil) if err != nil { t.Fatal(err) - } else if resp.StatusCode != http.StatusOK { - t.Fatalf("unexpected status: %d: %s", resp.StatusCode, resp.Header.Get("X-Broker-Error")) } - resp.Body.Close() + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status: %d: %s", resp.StatusCode, resp.Header.Get("X-Broker-Error")) + } else if b, _ := ioutil.ReadAll(resp.Body); string(b) != `{"urls":["//hostA","//hostB"]}`+"\n" { + t.Fatalf("unexpected body: %s", b) + } +} + +// Ensure a handler can respond to a ping with the current cluster configuration. +func TestHandler_servePing_NotLeader(t *testing.T) { + var hb HandlerBroker + hb.IsLeaderFunc = func() bool { return false } + hb.LeaderURLFunc = func() url.URL { return url.URL{Scheme: "http", Host: "other"} } + s := httptest.NewServer(&messaging.Handler{Broker: &hb}) + defer s.Close() + + // Send request to the broker. + resp, err := http.Post(s.URL+`/messaging/ping`, "application/octet-stream", nil) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusTemporaryRedirect { + t.Fatalf("unexpected status: %d: %s", resp.StatusCode, resp.Header.Get("X-Broker-Error")) + } else if loc := resp.Header.Get("Location"); loc != "http://other/messaging/ping" { + t.Fatalf("unexpected redirect location: %s", loc) + } } // Ensure the handler routes raft requests to the raft handler. @@ -239,12 +269,16 @@ func TestHandler_ErrNotFound(t *testing.T) { // HandlerBroker is a mockable type that implements Handler.Broker. type HandlerBroker struct { + URLsFunc func() []url.URL + IsLeaderFunc func() bool LeaderURLFunc func() url.URL PublishFunc func(m *messaging.Message) (uint64, error) TopicReaderFunc func(topicID, index uint64, streaming bool) io.ReadCloser SetTopicMaxIndexFunc func(topicID, index uint64) error } +func (b *HandlerBroker) URLs() []url.URL { return b.URLsFunc() } +func (b *HandlerBroker) IsLeader() bool { return b.IsLeaderFunc() } func (b *HandlerBroker) LeaderURL() url.URL { return b.LeaderURLFunc() } func (b *HandlerBroker) Publish(m *messaging.Message) (uint64, error) { return b.PublishFunc(m) } func (b *HandlerBroker) TopicReader(topicID, index uint64, streaming bool) io.ReadCloser { diff --git a/raft/log.go b/raft/log.go index 1d7c14b46c..e47696c842 100644 --- a/raft/log.go +++ b/raft/log.go @@ -172,6 +172,23 @@ func (l *Log) SetURL(u url.URL) { l.url = u } +// URLs returns a list of all URLs in the cluster. +func (l *Log) URLs() []url.URL { + l.mu.Lock() + defer l.mu.Unlock() + + if l.config == nil { + return nil + } + + var a []url.URL + for _, n := range l.config.Nodes { + a = append(a, n.URL) + } + + return a +} + func (l *Log) idPath() string { return filepath.Join(l.path, "id") } func (l *Log) termPath() string { return filepath.Join(l.path, "term") } func (l *Log) configPath() string { return filepath.Join(l.path, "config") } @@ -549,6 +566,13 @@ func (l *Log) tracef(msg string, v ...interface{}) { } } +// IsLeader returns true if the log is the current leader. +func (l *Log) IsLeader() bool { + l.mu.Lock() + defer l.mu.Unlock() + return l.id != 0 && l.id == l.leaderID +} + // Leader returns the id and URL associated with the current leader. // Returns zero if there is no current leader. func (l *Log) Leader() (id uint64, u url.URL) { @@ -557,17 +581,6 @@ func (l *Log) Leader() (id uint64, u url.URL) { return l.leader() } -// ClusterID returns the identifier for the cluster. -// Returns zero if the cluster has not been initialized yet. -func (l *Log) ClusterID() uint64 { - l.mu.Lock() - defer l.mu.Unlock() - if l.config == nil { - return 0 - } - return l.config.ClusterID -} - func (l *Log) leader() (id uint64, u url.URL) { // Ignore if there's no configuration set. if l.config == nil { @@ -583,6 +596,17 @@ func (l *Log) leader() (id uint64, u url.URL) { return n.ID, n.URL } +// ClusterID returns the identifier for the cluster. +// Returns zero if the cluster has not been initialized yet. +func (l *Log) ClusterID() uint64 { + l.mu.Lock() + defer l.mu.Unlock() + if l.config == nil { + return 0 + } + return l.config.ClusterID +} + // Join contacts a node in the cluster to request membership. // A log cannot join a cluster if it has already been initialized. func (l *Log) Join(u url.URL) error { diff --git a/server.go b/server.go index 20415ef7e6..d492df0ea7 100644 --- a/server.go +++ b/server.go @@ -2964,9 +2964,13 @@ func (r *Results) Error() error { // MessagingClient represents the client used to connect to brokers. type MessagingClient interface { - Open(path string, urls []url.URL) error + Open(path string) error Close() error + // Retrieves or sets the current list of broker URLs. + URLs() []url.URL + SetURLs([]url.URL) + // Publishes a message to the broker. Publish(m *messaging.Message) (index uint64, err error) diff --git a/test/messaging.go b/test/messaging.go index ae0e9d852a..10feb942ea 100644 --- a/test/messaging.go +++ b/test/messaging.go @@ -32,7 +32,7 @@ func NewMessagingClient() *MessagingClient { return c } -func (c *MessagingClient) Open(path string, urls []url.URL) error { return nil } +func (c *MessagingClient) Open(path string) error { return nil } // Close closes all open connections. func (c *MessagingClient) Close() error { @@ -46,6 +46,9 @@ func (c *MessagingClient) Close() error { return nil } +func (c *MessagingClient) URLs() []url.URL { return []url.URL{{Host: "local"}} } +func (c *MessagingClient) SetURLs([]url.URL) {} + func (c *MessagingClient) Publish(m *messaging.Message) (uint64, error) { return c.PublishFunc(m) } // DefaultPublishFunc sets an autoincrementing index on the message and sends it to each topic connection.