Add config notifications and increased test coverage.

pull/1935/head
Ben Johnson 2015-03-14 13:36:06 -06:00
parent fc189cd2ae
commit 53dbec8232
11 changed files with 790 additions and 80 deletions

View File

@ -300,10 +300,20 @@ func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, conf
// Create messaging client to the brokers.
c := influxdb.NewMessagingClient()
c.SetLogOutput(w)
if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile), clientJoinURLs); err != nil {
if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile)); err != nil {
log.Fatalf("messaging client error: %s", err)
}
// If join URLs were passed in then use them to override the client's URLs.
if len(clientJoinURLs) > 0 {
c.SetURLs(clientJoinURLs)
}
// If no URLs exist on the client the return an error since we cannot reach a broker.
if len(c.URLs()) == 0 {
log.Fatal("messaging client has no broker URLs")
}
// Create and open the server.
s := influxdb.NewServer()
s.SetLogOutput(w)

View File

@ -36,7 +36,9 @@ type Broker struct {
// Log is the distributed raft log that commands are applied to.
Log interface {
URL() url.URL
URLs() []url.URL
Leader() (uint64, url.URL)
IsLeader() bool
ClusterID() uint64
Apply(data []byte) (index uint64, err error)
}
@ -68,6 +70,12 @@ func (b *Broker) metaPath() string {
// URL returns the URL of the broker.
func (b *Broker) URL() url.URL { return b.Log.URL() }
// URLs returns a list of all broker URLs in the cluster.
func (b *Broker) URLs() []url.URL { return b.Log.URLs() }
// IsLeader returns true if the broker is the current cluster leader.
func (b *Broker) IsLeader() bool { return b.Log.IsLeader() }
// LeaderURL returns the URL to the leader broker.
func (b *Broker) LeaderURL() url.URL {
_, u := b.Log.Leader()

View File

@ -707,14 +707,18 @@ func (b *Broker) MustReadAllTopic(topicID uint64) (a []*messaging.Message) {
type BrokerLog struct {
ApplyFunc func(data []byte) (uint64, error)
ClusterIDFunc func() uint64
IsLeaderFunc func() bool
LeaderFunc func() (uint64, url.URL)
URLFunc func() url.URL
URLsFunc func() []url.URL
}
func (l *BrokerLog) Apply(data []byte) (uint64, error) { return l.ApplyFunc(data) }
func (l *BrokerLog) ClusterID() uint64 { return l.ClusterIDFunc() }
func (l *BrokerLog) IsLeader() bool { return l.IsLeaderFunc() }
func (l *BrokerLog) Leader() (uint64, url.URL) { return l.LeaderFunc() }
func (l *BrokerLog) URL() url.URL { return l.URLFunc() }
func (l *BrokerLog) URLs() []url.URL { return l.URLsFunc() }
// Messages represents a collection of messages.
// This type provides helper functions.

View File

@ -17,17 +17,20 @@ import (
"time"
)
// DefaultReconnectTimeout is the default time to wait between when a broker
// stream disconnects and another connection is retried.
const DefaultReconnectTimeout = 1000 * time.Millisecond
const (
// DefaultReconnectTimeout is the default time to wait between when a broker
// stream disconnects and another connection is retried.
DefaultReconnectTimeout = 1000 * time.Millisecond
// DefaultPingInterval is the default time to wait between checks to the broker.
const DefaultPingInterval = 1000 * time.Millisecond
// DefaultPingInterval is the default time to wait between checks to the broker.
DefaultPingInterval = 1000 * time.Millisecond
)
// Client represents a client for the broker's HTTP API.
type Client struct {
mu sync.Mutex
conns []*Conn
path string // config file path
conns []*Conn // all connections opened by client
url url.URL // current known leader URL
urls []url.URL // list of available broker URLs
@ -80,10 +83,27 @@ func (c *Client) setURL(u url.URL) {
}
}
// RandomizeURL sets a random URL from the configuration.
func (c *Client) RandomizeURL() {
// URLs returns a list of possible broker URLs to connect to.
func (c *Client) URLs() []url.URL {
c.mu.Lock()
defer c.mu.Unlock()
return c.urls
}
// SetURLs sets a list of possible URLs to connect to for the client and its connections.
func (c *Client) SetURLs(a []url.URL) {
c.mu.Lock()
defer c.mu.Unlock()
c.setURLs(a)
}
func (c *Client) setURLs(a []url.URL) {
// Ignore if the URL list is the same.
if urlsEqual(c.urls, a) {
return
}
c.urls = a
c.randomizeURL()
}
@ -102,8 +122,8 @@ func (c *Client) SetLogOutput(w io.Writer) {
c.Logger = log.New(w, "[messaging] ", log.LstdFlags)
}
// Open reads the configuration from the specified path or uses the URLs provided.
func (c *Client) Open(path string, urls []url.URL) error {
// Open opens the client and reads the configuration from the specified path.
func (c *Client) Open(path string) error {
c.mu.Lock()
defer c.mu.Unlock()
@ -113,29 +133,12 @@ func (c *Client) Open(path string, urls []url.URL) error {
}
// Read URLs from file if no URLs are provided.
if len(urls) == 0 {
if b, err := ioutil.ReadFile(path); os.IsNotExist(err) {
// nop
} else if err != nil {
return err
} else {
var config ClientConfig
if err := json.Unmarshal(b, &config); err != nil {
return err
}
c.urls = config.Brokers
}
c.path = path
if err := c.loadConfig(); err != nil {
_ = c.close()
return fmt.Errorf("load config: %s", err)
}
// Ensure we have at least one URL.
if len(urls) < 1 {
return ErrBrokerURLRequired
}
// Set the URLs whether they're from the config or passed in.
c.urls = urls
c.randomizeURL()
// Set open flag.
c.opened = true
@ -151,7 +154,10 @@ func (c *Client) Open(path string, urls []url.URL) error {
func (c *Client) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.close()
}
func (c *Client) close() error {
// Return error if the client is already closed.
if !c.opened {
return ErrClientClosed
@ -180,6 +186,53 @@ func (c *Client) Close() error {
return nil
}
// loadConfig reads the configuration from disk and sets the options on the client.
func (c *Client) loadConfig() error {
// Open config file for reading.
f, err := os.Open(c.path)
if os.IsNotExist(err) {
c.urls = nil
return nil
} else if err != nil {
return fmt.Errorf("open config: %s", err)
}
defer f.Close()
// Decode config from file.
var config ClientConfig
if err := json.NewDecoder(f).Decode(&config); err != nil {
return fmt.Errorf("decode config: %s", err)
}
// Set options.
c.urls = config.URLs
return nil
}
// setConfig writes a new config to disk and updates urls on the client.
func (c *Client) setConfig(config ClientConfig) error {
// Only write to disk if we have a path.
if c.path != "" {
// Open config file for writing.
f, err := os.Create(c.path)
if err != nil {
return fmt.Errorf("create: %s", err)
}
defer f.Close()
// Encode config to file.
if err := json.NewEncoder(f).Encode(&config); err != nil {
return fmt.Errorf("encode config: %s", err)
}
}
// Set options.
c.urls = config.URLs
return nil
}
// Publish sends a message to the broker and returns an index or error.
func (c *Client) Publish(m *Message) (uint64, error) {
// Post message to broker.
@ -198,7 +251,7 @@ func (c *Client) Publish(m *Message) (uint64, error) {
if errstr := resp.Header.Get("X-Broker-Error"); errstr != "" {
return 0, errors.New(errstr)
}
return 0, fmt.Errorf("cannot publish(%d)", resp.StatusCode)
return 0, fmt.Errorf("cannot publish: status=%d", resp.StatusCode)
}
// Parse broker index.
@ -218,7 +271,26 @@ func (c *Client) Ping() error {
if err != nil {
return err
}
resp.Body.Close()
defer resp.Body.Close()
// Read entire body.
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("read ping body: %s", err)
}
// Update config if body is passed back.
if len(b) != 0 {
var config ClientConfig
if err := json.Unmarshal(b, &config); err != nil {
return fmt.Errorf("unmarshal config: %s", err)
}
if err := c.setConfig(config); err != nil {
return fmt.Errorf("update config: %s", err)
}
}
return nil
}
@ -293,15 +365,39 @@ func (c *Client) pinger(closing chan struct{}) {
// ClientConfig represents the configuration that must be persisted across restarts.
type ClientConfig struct {
Brokers []url.URL `json:"brokers"`
Leader url.URL `json:"leader"`
URLs []url.URL
}
// NewClientConfig returns a new instance of ClientConfig.
func NewClientConfig(u []url.URL) *ClientConfig {
return &ClientConfig{
Brokers: u,
func (c ClientConfig) MarshalJSON() ([]byte, error) {
var other clientConfigJSON
other.URLs = make([]string, len(c.URLs))
for i, u := range c.URLs {
other.URLs[i] = u.String()
}
return json.Marshal(&other)
}
func (c *ClientConfig) UnmarshalJSON(b []byte) error {
var other clientConfigJSON
if err := json.Unmarshal(b, &other); err != nil {
return err
}
c.URLs = make([]url.URL, len(other.URLs))
for i := range other.URLs {
u, err := url.Parse(other.URLs[i])
if err != nil {
return err
}
c.URLs[i] = *u
}
return nil
}
// clientConfigJSON represents the JSON
type clientConfigJSON struct {
URLs []string `json:"urls"`
}
// Conn represents a stream over the client for a single topic.
@ -465,7 +561,7 @@ func (c *Conn) Heartbeat() error {
if errstr := resp.Header.Get("X-Broker-Error"); errstr != "" {
return errors.New(errstr)
}
return fmt.Errorf("heartbeat error: %d", resp.StatusCode)
return fmt.Errorf("heartbeat error: status=%d", resp.StatusCode)
}
return nil
}
@ -563,3 +659,16 @@ func (c *Conn) stream(req *http.Request, closing <-chan struct{}) error {
}
}
}
// urlsEqual returns true if a and b contain the same URLs in the same order.
func urlsEqual(a, b []url.URL) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

View File

@ -1,16 +1,323 @@
package messaging_test
import (
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"os"
"reflect"
"strings"
"testing"
"github.com/influxdb/influxdb/messaging"
)
// Ensure a client can open the configuration file, if it exists.
func TestClient_Open_WithConfig(t *testing.T) {
// Write configuration file.
path := NewTempFile()
defer os.Remove(path)
MustWriteFile(path, []byte(`{"urls":["//hostA"]}`))
// Open new client against path.
c := NewClient()
if err := c.Open(path); err != nil {
t.Fatalf("unexpected error: %s", err)
}
defer c.Close()
// Verify that urls were populated.
if a := c.URLs(); !reflect.DeepEqual(a, []url.URL{{Host: "hostA"}}) {
t.Fatalf("unexpected urls: %#v", a)
}
}
// Ensure a client will ignore non-existent a config file.
func TestClient_Open_WithMissingConfig(t *testing.T) {
path := NewTempFile()
c := NewClient()
c.SetURLs([]url.URL{{Host: "//hostA"}})
if err := c.Open(path); err != nil {
t.Fatalf("unexpected error: %s", err)
}
defer c.Close()
// Verify that urls were cleared.
if a := c.URLs(); len(a) != 0 {
t.Fatalf("unexpected urls: %#v", a)
}
}
// Ensure a client can return an error if the configuration file is corrupt.
func TestClient_Open_WithInvalidConfig(t *testing.T) {
// Write bad configuration file.
path := NewTempFile()
defer os.Remove(path)
MustWriteFile(path, []byte(`{"urls":`))
// Open new client against path.
c := NewClient()
if err := c.Open(path); err == nil || err.Error() != `load config: decode config: unexpected EOF` {
t.Fatalf("unexpected error: %s", err)
}
defer c.Close()
}
// Ensure a client can return an error if the configuration file has non-readable permissions.
func TestClient_Open_WithBadPermConfig(t *testing.T) {
// Write inaccessible configuration file.
path := NewTempFile()
defer os.Remove(path)
MustWriteFile(path, []byte(`{"urls":["//hostA"]}`))
os.Chmod(path, 0000)
// Open new client against path.
c := NewClient()
if err := c.Open(path); err == nil || !strings.Contains(err.Error(), `permission denied`) {
t.Fatalf("unexpected error: %s", err)
}
defer c.Close()
}
// Ensure a client returns an error when reopening.
func TestClient_Open_ErrClientOpen(t *testing.T) {
c := NewClient()
c.Open("")
defer c.Close()
if err := c.Open(""); err != messaging.ErrClientOpen {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure the URL on a client can be set and retrieved.
func TestClient_SetURL(t *testing.T) {
c := NewClient()
defer c.Close()
c.SetURL(url.URL{Host: "localhost"})
if u := c.URL(); u != (url.URL{Host: "localhost"}) {
t.Fatalf("unexpected url: %s", u)
}
}
// Ensure a client will update its connection urls.
func TestClient_SetURL_UpdateConn(t *testing.T) {
c := NewClient()
c.MustOpen("")
c.SetURLs([]url.URL{{Host: "hostA"}})
defer c.Close()
// Create connection & check URL.
conn := c.Conn(0)
if u := conn.URL(); u != (url.URL{Host: "hostA"}) {
t.Fatalf("unexpected initial connection url: %s", u)
}
// Update client url.
c.SetURL(url.URL{Host: "hostB"})
// Check that connection url was updated.
if u := conn.URL(); u != (url.URL{Host: "hostB"}) {
t.Fatalf("unexpected new connection url: %s", u)
}
}
// Ensure a set of URLs can be set on the client and retrieved.
// One of those URLs should be randomly set as the current URL.
func TestClient_SetURLs(t *testing.T) {
c := NewClient()
defer c.Close()
// Set and retrieve URLs.
c.SetURLs([]url.URL{{Host: "hostA"}, {Host: "hostB"}})
if a := c.URLs(); a[0] != (url.URL{Host: "hostA"}) {
t.Fatalf("unexpected urls length: %d", len(a))
} else if a := c.URLs(); a[0] != (url.URL{Host: "hostA"}) {
t.Fatalf("unexpected url(0): %s", a[0])
} else if a := c.URLs(); a[1] != (url.URL{Host: "hostB"}) {
t.Fatalf("unexpected url(1): %s", a[1])
}
// Current URL should be one of the URLs set.
if u := c.URL(); u != (url.URL{Host: "hostA"}) && u != (url.URL{Host: "hostB"}) {
t.Fatalf("unexpected url: %s", u)
}
}
// Ensure that an empty set of URLs can be set to the client.
func TestClient_SetURLs_NoURLs(t *testing.T) {
c := NewClient()
defer c.Close()
c.SetURLs([]url.URL{})
}
// Ensure a client can publish a message to the broker.
func TestClient_Publish(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path != "/messaging/messages" {
t.Fatalf("unexpected path: %s", req.URL.Path)
} else if req.Method != "POST" {
t.Fatalf("unexpected method: %s", req.Method)
} else if typ := req.URL.Query().Get("type"); typ != "1" {
t.Fatalf("unexpected type: %s", typ)
} else if topicID := req.URL.Query().Get("topicID"); topicID != "2" {
t.Fatalf("unexpected topicID: %s", topicID)
}
w.Header().Set("X-Broker-Index", "200")
}))
defer s.Close()
// Create client.
c := NewClient()
c.MustOpen("")
c.SetURL(*MustParseURL(s.URL))
defer c.Close()
// Publish message to server.
if index, err := c.Publish(&messaging.Message{Type: 1, TopicID: 2, Data: []byte{0, 0, 0, 0}}); err != nil {
t.Fatal(err)
} else if index != 200 {
t.Fatalf("unexpected index: %d", index)
}
}
// Ensure a client can redirect a published a message to another broker.
func TestClient_Publish_Redirect(t *testing.T) {
// Create a server to receive redirection.
s0 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path != "/messaging/messages" {
t.Fatalf("unexpected path: %s", req.URL.Path)
} else if req.Method != "POST" {
t.Fatalf("unexpected method: %s", req.Method)
} else if typ := req.URL.Query().Get("type"); typ != "1" {
t.Fatalf("unexpected type: %s", typ)
} else if topicID := req.URL.Query().Get("topicID"); topicID != "2" {
t.Fatalf("unexpected topicID: %s", topicID)
}
w.Header().Set("X-Broker-Index", "200")
}))
defer s0.Close()
// Create another server to redirect to the first one.
s1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
http.Redirect(w, req, s0.URL+req.URL.Path, http.StatusTemporaryRedirect)
}))
defer s1.Close()
// Create client.
c := NewClient()
c.MustOpen("")
c.SetURL(*MustParseURL(s1.URL))
defer c.Close()
// Publish message to server.
if index, err := c.Publish(&messaging.Message{Type: 1, TopicID: 2, Data: []byte{0, 0, 0, 0}}); err != nil {
t.Fatal(err)
} else if index != 200 {
t.Fatalf("unexpected index: %d", index)
}
}
// Ensure a client returns an error if the responses Location header is invalid.
func TestClient_Publish_Redirect_ErrInvalidLocation(t *testing.T) {
// Create another server to redirect to the first one.
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
http.Redirect(w, req, "http://%f", http.StatusTemporaryRedirect)
}))
defer s.Close()
// Create client.
c := NewClient()
c.MustOpen("")
c.SetURL(*MustParseURL(s.URL))
defer c.Close()
// Publish message to server.
if _, err := c.Publish(&messaging.Message{}); err == nil || err.Error() != `do: invalid redirect location: http://%f` {
t.Fatal(err)
}
}
// Ensure a client returns an error publishing to a down broker.
func TestClient_Publish_ErrConnectionRefused(t *testing.T) {
s := httptest.NewServer(nil)
s.Close()
// Create client.
c := NewClient()
c.MustOpen("")
c.SetURL(*MustParseURL(s.URL))
defer c.Close()
// Publish message to server.
if _, err := c.Publish(&messaging.Message{}); err == nil || !strings.Contains(err.Error(), `connection refused`) {
t.Fatal(err)
}
}
// Ensure a client returns an error if returned by the server.
func TestClient_Publish_ErrBrokerError(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("X-Broker-Error", "oh no")
w.WriteHeader(http.StatusInternalServerError)
}))
defer s.Close()
// Create client.
c := NewClient()
c.MustOpen("")
c.SetURL(*MustParseURL(s.URL))
defer c.Close()
// Publish message to server.
if _, err := c.Publish(&messaging.Message{}); err == nil || err.Error() != `oh no` {
t.Fatal(err)
}
}
// Ensure a client returns an error if a non-broker error occurs.
func TestClient_Publish_ErrHTTPError(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer s.Close()
// Create client.
c := NewClient()
c.MustOpen("")
c.SetURL(*MustParseURL(s.URL))
defer c.Close()
// Publish message to server.
if _, err := c.Publish(&messaging.Message{}); err == nil || err.Error() != `cannot publish: status=500` {
t.Fatal(err)
}
}
// Ensure a client returns an error if the returned index is invalid.
func TestClient_Publish_ErrInvalidIndex(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("X-Broker-Index", "xxx")
}))
defer s.Close()
// Create client.
c := NewClient()
c.MustOpen("")
c.SetURL(*MustParseURL(s.URL))
defer c.Close()
// Publish message to server.
if _, err := c.Publish(&messaging.Message{}); err == nil || err.Error() != `invalid index: strconv.ParseUint: parsing "xxx": invalid syntax` {
t.Fatal(err)
}
}
// Ensure a client can check if the server is alive.
func TestClient_Ping(t *testing.T) {
var pinged bool
@ -23,10 +330,9 @@ func TestClient_Ping(t *testing.T) {
defer s.Close()
// Create client.
c := messaging.NewClient()
if err := c.Open("", []url.URL{*MustParseURL(s.URL)}); err != nil {
t.Fatal(err)
}
c := NewClient()
c.MustOpen("")
c.SetURLs([]url.URL{*MustParseURL(s.URL)})
defer c.Close()
// Ping server.
@ -37,6 +343,97 @@ func TestClient_Ping(t *testing.T) {
}
}
// Ensure a client returns an error if the ping cannot connect to the server.
func TestClient_Ping_ErrConnectionRefused(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {}))
s.Close()
// Create client.
c := NewClient()
c.MustOpen("")
c.SetURLs([]url.URL{*MustParseURL(s.URL)})
defer c.Close()
// Ping server.
if err := c.Ping(); err == nil || !strings.Contains(err.Error(), `connection refused`) {
t.Fatal(err)
}
}
// Ensure a client returns an error if the body of the response cannot be read.
func TestClient_Ping_ErrRead(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Length", "10")
w.Write(make([]byte, 9))
}))
defer s.Close()
// Create client.
c := NewClient()
c.MustOpen("")
c.SetURLs([]url.URL{*MustParseURL(s.URL)})
defer c.Close()
// Ping server.
if err := c.Ping(); err == nil || err.Error() != `read ping body: unexpected EOF` {
t.Fatal(err)
}
}
// Ensure a client can receive config data from the broker on ping.
func TestClient_Ping_ReceiveConfig(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte(`{"urls":["//local.dev"]}`))
}))
defer s.Close()
// Create a temp file for configuration.
path := NewTempFile()
defer os.Remove(path)
// Create client.
c := NewClient()
c.MustOpen(path)
c.SetURLs([]url.URL{*MustParseURL(s.URL)})
defer c.Close()
// Ping server.
if err := c.Ping(); err != nil {
t.Fatal(err)
}
// Confirm config change.
if a := c.URLs(); len(a) != 1 {
t.Fatalf("unexpected urls length: %d", len(a))
} else if a[0] != (url.URL{Host: "local.dev"}) {
t.Fatalf("unexpected url(0): %s", a[0])
}
// Confirm config was rewritten.
if b, _ := ioutil.ReadFile(path); string(b) != `{"urls":["//local.dev"]}`+"\n" {
t.Fatalf("unexpected config file: %s", b)
}
}
// Ensure a client returns an error when ping response is invalid.
func TestClient_Ping_ErrInvalidResponse(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte(`{"urls":`))
}))
defer s.Close()
// Create client.
c := NewClient()
c.MustOpen("")
c.SetURLs([]url.URL{*MustParseURL(s.URL)})
defer c.Close()
// Ping server.
if err := c.Ping(); err == nil || err.Error() != `unmarshal config: unexpected end of JSON input` {
t.Fatal(err)
}
}
// Ensure a client can be opened and connections can be created.
func TestClient_Conn(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@ -50,15 +447,16 @@ func TestClient_Conn(t *testing.T) {
defer s.Close()
// Create and open connection to server.
c := messaging.NewClient()
if err := c.Open("", []url.URL{*MustParseURL(s.URL)}); err != nil {
t.Fatal(err)
}
c := NewClient()
c.MustOpen("")
c.SetURLs([]url.URL{*MustParseURL(s.URL)})
// Connect on topic #1.
conn1 := c.Conn(1)
if err := conn1.Open(0, false); err != nil {
t.Fatal(err)
} else if conn1.TopicID() != 1 {
t.Fatalf("unexpected topic id(1): %d", conn1.TopicID())
} else if m := <-conn1.C(); !reflect.DeepEqual(m, &messaging.Message{Index: 1, Data: []byte{100}}) {
t.Fatalf("unexpected message(1): %#v", m)
}
@ -205,21 +603,122 @@ func TestConn_Heartbeat(t *testing.T) {
}
}
// Client represents a test wrapper for the broker client.
// Ensure that a connection returns an error if it cannot connect to the broker.
func TestConn_Heartbeat_ErrConnectionRefused(t *testing.T) {
s := httptest.NewServer(nil)
s.Close()
// Create connection and heartbeat.
c := messaging.NewConn(0)
c.SetURL(*MustParseURL(s.URL))
if err := c.Heartbeat(); err == nil || !strings.Contains(err.Error(), `connection refused`) {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that a connection returns an error if the heartbeat is redirected.
// This occurs when the broker is not the leader. The client will update the URL later.
func TestConn_Heartbeat_ErrNoLeader(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusTemporaryRedirect)
}))
defer s.Close()
// Create connection and heartbeat.
c := messaging.NewConn(0)
c.SetURL(*MustParseURL(s.URL))
if err := c.Heartbeat(); err != messaging.ErrNoLeader {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that a connection returns a broker error while heartbeating.
func TestConn_Heartbeat_ErrBrokerError(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("X-Broker-Error", "oh no")
w.WriteHeader(http.StatusInternalServerError)
}))
defer s.Close()
// Create connection and heartbeat.
c := messaging.NewConn(0)
c.SetURL(*MustParseURL(s.URL))
if err := c.Heartbeat(); err == nil || err.Error() != `oh no` {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that a connection returns an http error while heartbeating.
func TestConn_Heartbeat_ErrHTTPError(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer s.Close()
// Create connection and heartbeat.
c := messaging.NewConn(0)
c.SetURL(*MustParseURL(s.URL))
if err := c.Heartbeat(); err == nil || err.Error() != `heartbeat error: status=500` {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that the client config can be serialized to JSON.
func TestClientConfig_MarshalJSON(t *testing.T) {
c := messaging.ClientConfig{URLs: []url.URL{{Host: "hostA"}, {Host: "hostB"}}}
if b, err := json.Marshal(&c); err != nil {
t.Fatal(err)
} else if string(b) != `{"urls":["//hostA","//hostB"]}` {
t.Fatalf("unexpected json: %s", b)
}
}
// Ensure that the client config can be deserialized from JSON.
func TestClientConfig_UnmarshalJSON(t *testing.T) {
var c messaging.ClientConfig
if err := json.Unmarshal([]byte(`{"urls":["//hostA","//hostB"]}`), &c); err != nil {
t.Fatal(err)
}
if len(c.URLs) != 2 {
t.Fatalf("unexpected url count: %d", len(c.URLs))
} else if c.URLs[0] != (url.URL{Host: "hostA"}) {
t.Fatalf("unexpected url(0): %s", c.URLs[0])
} else if c.URLs[1] != (url.URL{Host: "hostB"}) {
t.Fatalf("unexpected url(1): %s", c.URLs[1])
}
}
// Ensure that the client config returns an error when handling an invalid field type.
func TestClientConfig_UnmarshalJSON_ErrInvalidType(t *testing.T) {
var c messaging.ClientConfig
if err := json.Unmarshal([]byte(`{"urls":0}`), &c); err == nil || err.Error() != `json: cannot unmarshal number into Go value of type []string` {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that the client config returns an error when handling an invalid url.
func TestClientConfig_UnmarshalJSON_ErrInvalidURL(t *testing.T) {
var c messaging.ClientConfig
if err := json.Unmarshal([]byte(`{"urls":["http://%foo"]}`), &c); err == nil || err.Error() != `parse http://%foo: hexadecimal escape in host` {
t.Fatalf("unexpected error: %s", err)
}
}
// Client represents a test wrapper for messaging.Client.
type Client struct {
*messaging.Client
}
// NewClient returns a new instance of Client.
func NewClient(replicaID uint64) *Client {
return &Client{
Client: messaging.NewClient(),
}
// NewClient returns an new instance of Client.
func NewClient() *Client {
return &Client{messaging.NewClient()}
}
// Close shuts down the client and server.
func (c *Client) Close() {
c.Client.Close()
// MustOpen opens the client. Panic on error.
func (c *Client) MustOpen(path string) {
if err := c.Open(path); err != nil {
panic(err.Error())
}
}
// NewTempFile returns the path of a new temporary file.
@ -229,6 +728,7 @@ func NewTempFile() string {
if err != nil {
panic(err)
}
defer f.Close()
f.Close()
os.Remove(f.Name())
return f.Name()
}

View File

@ -46,9 +46,6 @@ var (
// ErrConnCannotReuse is returned when opening a previously closed connection.
ErrConnCannotReuse = errors.New("cannot reuse connection")
// ErrBrokerURLRequired is returned when opening a broker without URLs.
ErrBrokerURLRequired = errors.New("broker url required")
// ErrMessageTypeRequired is returned publishing a message without a type.
ErrMessageTypeRequired = errors.New("message type required")

View File

@ -1,6 +1,7 @@
package messaging
import (
"encoding/json"
"io"
"io/ioutil"
"log"
@ -15,6 +16,8 @@ import (
// Handler represents an HTTP handler by the broker.
type Handler struct {
Broker interface {
URLs() []url.URL
IsLeader() bool
LeaderURL() url.URL
TopicReader(topicID, index uint64, streaming bool) io.ReadCloser
Publish(m *Message) (uint64, error)
@ -169,6 +172,20 @@ func (h *Handler) postHeartbeat(w http.ResponseWriter, r *http.Request) {
// servePing returns a status 200.
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
// Redirect if not leader.
if !h.Broker.IsLeader() {
h.redirectToLeader(w, r)
return
}
// Write out client configuration.
var config ClientConfig
config.URLs = h.Broker.URLs()
if err := json.NewEncoder(w).Encode(&config); err != nil {
log.Printf("unable to write client config: %s", err)
return
}
w.WriteHeader(http.StatusOK)
}

View File

@ -3,6 +3,7 @@ package messaging_test
import (
"bytes"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
@ -195,19 +196,48 @@ func TestHandler_postHeartbeat_ErrMethodNotAllowed(t *testing.T) {
resp.Body.Close()
}
// Ensure a handler can respond to a ping.
// Ensure a handler can respond to a ping with the current cluster configuration.
func TestHandler_servePing(t *testing.T) {
s := httptest.NewServer(&messaging.Handler{})
var hb HandlerBroker
hb.IsLeaderFunc = func() bool { return true }
hb.URLsFunc = func() []url.URL { return []url.URL{{Host: "hostA"}, {Host: "hostB"}} }
s := httptest.NewServer(&messaging.Handler{Broker: &hb})
defer s.Close()
// Send request to the broker.
resp, err := http.Post(s.URL+`/messaging/ping`, "application/octet-stream", nil)
if err != nil {
t.Fatal(err)
} else if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status: %d: %s", resp.StatusCode, resp.Header.Get("X-Broker-Error"))
}
resp.Body.Close()
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status: %d: %s", resp.StatusCode, resp.Header.Get("X-Broker-Error"))
} else if b, _ := ioutil.ReadAll(resp.Body); string(b) != `{"urls":["//hostA","//hostB"]}`+"\n" {
t.Fatalf("unexpected body: %s", b)
}
}
// Ensure a handler can respond to a ping with the current cluster configuration.
func TestHandler_servePing_NotLeader(t *testing.T) {
var hb HandlerBroker
hb.IsLeaderFunc = func() bool { return false }
hb.LeaderURLFunc = func() url.URL { return url.URL{Scheme: "http", Host: "other"} }
s := httptest.NewServer(&messaging.Handler{Broker: &hb})
defer s.Close()
// Send request to the broker.
resp, err := http.Post(s.URL+`/messaging/ping`, "application/octet-stream", nil)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusTemporaryRedirect {
t.Fatalf("unexpected status: %d: %s", resp.StatusCode, resp.Header.Get("X-Broker-Error"))
} else if loc := resp.Header.Get("Location"); loc != "http://other/messaging/ping" {
t.Fatalf("unexpected redirect location: %s", loc)
}
}
// Ensure the handler routes raft requests to the raft handler.
@ -239,12 +269,16 @@ func TestHandler_ErrNotFound(t *testing.T) {
// HandlerBroker is a mockable type that implements Handler.Broker.
type HandlerBroker struct {
URLsFunc func() []url.URL
IsLeaderFunc func() bool
LeaderURLFunc func() url.URL
PublishFunc func(m *messaging.Message) (uint64, error)
TopicReaderFunc func(topicID, index uint64, streaming bool) io.ReadCloser
SetTopicMaxIndexFunc func(topicID, index uint64) error
}
func (b *HandlerBroker) URLs() []url.URL { return b.URLsFunc() }
func (b *HandlerBroker) IsLeader() bool { return b.IsLeaderFunc() }
func (b *HandlerBroker) LeaderURL() url.URL { return b.LeaderURLFunc() }
func (b *HandlerBroker) Publish(m *messaging.Message) (uint64, error) { return b.PublishFunc(m) }
func (b *HandlerBroker) TopicReader(topicID, index uint64, streaming bool) io.ReadCloser {

View File

@ -172,6 +172,23 @@ func (l *Log) SetURL(u url.URL) {
l.url = u
}
// URLs returns a list of all URLs in the cluster.
func (l *Log) URLs() []url.URL {
l.mu.Lock()
defer l.mu.Unlock()
if l.config == nil {
return nil
}
var a []url.URL
for _, n := range l.config.Nodes {
a = append(a, n.URL)
}
return a
}
func (l *Log) idPath() string { return filepath.Join(l.path, "id") }
func (l *Log) termPath() string { return filepath.Join(l.path, "term") }
func (l *Log) configPath() string { return filepath.Join(l.path, "config") }
@ -549,6 +566,13 @@ func (l *Log) tracef(msg string, v ...interface{}) {
}
}
// IsLeader returns true if the log is the current leader.
func (l *Log) IsLeader() bool {
l.mu.Lock()
defer l.mu.Unlock()
return l.id != 0 && l.id == l.leaderID
}
// Leader returns the id and URL associated with the current leader.
// Returns zero if there is no current leader.
func (l *Log) Leader() (id uint64, u url.URL) {
@ -557,17 +581,6 @@ func (l *Log) Leader() (id uint64, u url.URL) {
return l.leader()
}
// ClusterID returns the identifier for the cluster.
// Returns zero if the cluster has not been initialized yet.
func (l *Log) ClusterID() uint64 {
l.mu.Lock()
defer l.mu.Unlock()
if l.config == nil {
return 0
}
return l.config.ClusterID
}
func (l *Log) leader() (id uint64, u url.URL) {
// Ignore if there's no configuration set.
if l.config == nil {
@ -583,6 +596,17 @@ func (l *Log) leader() (id uint64, u url.URL) {
return n.ID, n.URL
}
// ClusterID returns the identifier for the cluster.
// Returns zero if the cluster has not been initialized yet.
func (l *Log) ClusterID() uint64 {
l.mu.Lock()
defer l.mu.Unlock()
if l.config == nil {
return 0
}
return l.config.ClusterID
}
// Join contacts a node in the cluster to request membership.
// A log cannot join a cluster if it has already been initialized.
func (l *Log) Join(u url.URL) error {

View File

@ -2964,9 +2964,13 @@ func (r *Results) Error() error {
// MessagingClient represents the client used to connect to brokers.
type MessagingClient interface {
Open(path string, urls []url.URL) error
Open(path string) error
Close() error
// Retrieves or sets the current list of broker URLs.
URLs() []url.URL
SetURLs([]url.URL)
// Publishes a message to the broker.
Publish(m *messaging.Message) (index uint64, err error)

View File

@ -32,7 +32,7 @@ func NewMessagingClient() *MessagingClient {
return c
}
func (c *MessagingClient) Open(path string, urls []url.URL) error { return nil }
func (c *MessagingClient) Open(path string) error { return nil }
// Close closes all open connections.
func (c *MessagingClient) Close() error {
@ -46,6 +46,9 @@ func (c *MessagingClient) Close() error {
return nil
}
func (c *MessagingClient) URLs() []url.URL { return []url.URL{{Host: "local"}} }
func (c *MessagingClient) SetURLs([]url.URL) {}
func (c *MessagingClient) Publish(m *messaging.Message) (uint64, error) { return c.PublishFunc(m) }
// DefaultPublishFunc sets an autoincrementing index on the message and sends it to each topic connection.