Client should track current broker leader

pull/1877/head
Philip O'Toole 2015-03-06 13:51:13 -08:00
parent bbd852e77d
commit 317fe8335a
1 changed files with 31 additions and 16 deletions

View File

@ -25,6 +25,7 @@ const DefaultReconnectTimeout = 100 * time.Millisecond
// across restarts.
type ClientConfig struct {
Brokers []*url.URL `json:"brokers"`
Leader *url.URL `json:"leader"`
}
// NewClientConfig returns a new instance of ClientConfig.
@ -83,9 +84,23 @@ func (c *Client) LeaderURL() *url.URL {
c.mu.Lock()
defer c.mu.Unlock()
// TODO(benbjohnson): Actually keep track of the leader.
// HACK(benbjohnson): For testing, just grab a url.
// If an explicit leader has not been set, just pick the first of the brokers.
if c.config.Leader == nil {
return c.config.Brokers[0]
} else {
return c.config.Leader
}
}
// SetLeaderURL sets the explicit broker leader.
func (c *Client) SetLeaderURL(u *url.URL) {
c.mu.Lock()
defer c.mu.Unlock()
if u == nil {
return
}
c.Logger.Printf("setting broker leader to %s", u.String())
c.config.Leader = u
}
// SetLogOutput sets writer for all Client log output.
@ -182,8 +197,8 @@ func (c *Client) Publish(m *Message) (uint64, error) {
var resp *http.Response
var err error
u := *c.LeaderURL()
for {
u := *c.LeaderURL()
// Send the message to the messages endpoint.
u.Path = "/messaging/messages"
u.RawQuery = url.Values{
@ -203,7 +218,7 @@ func (c *Client) Publish(m *Message) (uint64, error) {
if err != nil {
return 0, fmt.Errorf("bad redirect: %s", resp.Header.Get("Location"))
}
u = *redirectURL
c.SetLeaderURL(redirectURL)
continue
} else if resp.StatusCode != http.StatusOK {
if errstr := resp.Header.Get("X-Broker-Error"); errstr != "" {
@ -229,14 +244,14 @@ func (c *Client) CreateReplica(id uint64, u *url.URL) error {
var resp *http.Response
var err error
leaderURL := *c.LeaderURL()
for {
leaderURL.Path = "/messaging/replicas"
leaderURL.RawQuery = url.Values{
u := *c.LeaderURL()
u.Path = "/messaging/replicas"
u.RawQuery = url.Values{
"id": {strconv.FormatUint(id, 10)},
"url": {u.String()},
}.Encode()
resp, err = http.Post(leaderURL.String(), "application/octet-stream", nil)
resp, err = http.Post(u.String(), "application/octet-stream", nil)
if err != nil {
return err
}
@ -249,7 +264,7 @@ func (c *Client) CreateReplica(id uint64, u *url.URL) error {
if err != nil {
return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location"))
}
leaderURL = *redirectURL
c.SetLeaderURL(redirectURL)
continue
} else if resp.StatusCode != http.StatusCreated {
return errors.New(resp.Header.Get("X-Broker-Error"))
@ -265,8 +280,8 @@ func (c *Client) DeleteReplica(id uint64) error {
var resp *http.Response
var err error
u := *c.LeaderURL()
for {
u := *c.LeaderURL()
u.Path = "/messaging/replicas"
u.RawQuery = url.Values{"id": {strconv.FormatUint(id, 10)}}.Encode()
req, _ := http.NewRequest("DELETE", u.String(), nil)
@ -283,7 +298,7 @@ func (c *Client) DeleteReplica(id uint64) error {
if err != nil {
return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location"))
}
u = *redirectURL
c.SetLeaderURL(redirectURL)
continue
} else if resp.StatusCode != http.StatusNoContent {
return errors.New(resp.Header.Get("X-Broker-Error"))
@ -299,8 +314,8 @@ func (c *Client) Subscribe(replicaID, topicID uint64) error {
var resp *http.Response
var err error
u := *c.LeaderURL()
for {
u := *c.LeaderURL()
u.Path = "/messaging/subscriptions"
u.RawQuery = url.Values{
"replicaID": {strconv.FormatUint(replicaID, 10)},
@ -319,7 +334,7 @@ func (c *Client) Subscribe(replicaID, topicID uint64) error {
if err != nil {
return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location"))
}
u = *redirectURL
c.SetLeaderURL(redirectURL)
continue
} else if resp.StatusCode != http.StatusCreated {
return errors.New(resp.Header.Get("X-Broker-Error"))
@ -335,8 +350,8 @@ func (c *Client) Unsubscribe(replicaID, topicID uint64) error {
var resp *http.Response
var err error
u := *c.LeaderURL()
for {
u := *c.LeaderURL()
u.Path = "/messaging/subscriptions"
u.RawQuery = url.Values{
"replicaID": {strconv.FormatUint(replicaID, 10)},
@ -356,7 +371,7 @@ func (c *Client) Unsubscribe(replicaID, topicID uint64) error {
if err != nil {
return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location"))
}
u = *redirectURL
c.SetLeaderURL(redirectURL)
continue
} else if resp.StatusCode != http.StatusNoContent {
return errors.New(resp.Header.Get("X-Broker-Error"))