diff --git a/messaging/client.go b/messaging/client.go index 3518f562f6..4a7053dab6 100644 --- a/messaging/client.go +++ b/messaging/client.go @@ -25,6 +25,7 @@ const DefaultReconnectTimeout = 100 * time.Millisecond // across restarts. type ClientConfig struct { Brokers []*url.URL `json:"brokers"` + Leader *url.URL `json:"leader"` } // NewClientConfig returns a new instance of ClientConfig. @@ -83,9 +84,23 @@ func (c *Client) LeaderURL() *url.URL { c.mu.Lock() defer c.mu.Unlock() - // TODO(benbjohnson): Actually keep track of the leader. - // HACK(benbjohnson): For testing, just grab a url. - return c.config.Brokers[0] + // If an explicit leader has not been set, just pick the first of the brokers. + if c.config.Leader == nil { + return c.config.Brokers[0] + } else { + return c.config.Leader + } +} + +// SetLeaderURL sets the explicit broker leader. +func (c *Client) SetLeaderURL(u *url.URL) { + c.mu.Lock() + defer c.mu.Unlock() + if u == nil { + return + } + c.Logger.Printf("setting broker leader to %s", u.String()) + c.config.Leader = u } // SetLogOutput sets writer for all Client log output. @@ -182,8 +197,8 @@ func (c *Client) Publish(m *Message) (uint64, error) { var resp *http.Response var err error - u := *c.LeaderURL() for { + u := *c.LeaderURL() // Send the message to the messages endpoint. u.Path = "/messaging/messages" u.RawQuery = url.Values{ @@ -203,7 +218,7 @@ func (c *Client) Publish(m *Message) (uint64, error) { if err != nil { return 0, fmt.Errorf("bad redirect: %s", resp.Header.Get("Location")) } - u = *redirectURL + c.SetLeaderURL(redirectURL) continue } else if resp.StatusCode != http.StatusOK { if errstr := resp.Header.Get("X-Broker-Error"); errstr != "" { @@ -229,14 +244,14 @@ func (c *Client) CreateReplica(id uint64, u *url.URL) error { var resp *http.Response var err error - leaderURL := *c.LeaderURL() for { - leaderURL.Path = "/messaging/replicas" - leaderURL.RawQuery = url.Values{ + u := *c.LeaderURL() + u.Path = "/messaging/replicas" + u.RawQuery = url.Values{ "id": {strconv.FormatUint(id, 10)}, "url": {u.String()}, }.Encode() - resp, err = http.Post(leaderURL.String(), "application/octet-stream", nil) + resp, err = http.Post(u.String(), "application/octet-stream", nil) if err != nil { return err } @@ -249,7 +264,7 @@ func (c *Client) CreateReplica(id uint64, u *url.URL) error { if err != nil { return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location")) } - leaderURL = *redirectURL + c.SetLeaderURL(redirectURL) continue } else if resp.StatusCode != http.StatusCreated { return errors.New(resp.Header.Get("X-Broker-Error")) @@ -265,8 +280,8 @@ func (c *Client) DeleteReplica(id uint64) error { var resp *http.Response var err error - u := *c.LeaderURL() for { + u := *c.LeaderURL() u.Path = "/messaging/replicas" u.RawQuery = url.Values{"id": {strconv.FormatUint(id, 10)}}.Encode() req, _ := http.NewRequest("DELETE", u.String(), nil) @@ -283,7 +298,7 @@ func (c *Client) DeleteReplica(id uint64) error { if err != nil { return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location")) } - u = *redirectURL + c.SetLeaderURL(redirectURL) continue } else if resp.StatusCode != http.StatusNoContent { return errors.New(resp.Header.Get("X-Broker-Error")) @@ -299,8 +314,8 @@ func (c *Client) Subscribe(replicaID, topicID uint64) error { var resp *http.Response var err error - u := *c.LeaderURL() for { + u := *c.LeaderURL() u.Path = "/messaging/subscriptions" u.RawQuery = url.Values{ "replicaID": {strconv.FormatUint(replicaID, 10)}, @@ -319,7 +334,7 @@ func (c *Client) Subscribe(replicaID, topicID uint64) error { if err != nil { return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location")) } - u = *redirectURL + c.SetLeaderURL(redirectURL) continue } else if resp.StatusCode != http.StatusCreated { return errors.New(resp.Header.Get("X-Broker-Error")) @@ -335,8 +350,8 @@ func (c *Client) Unsubscribe(replicaID, topicID uint64) error { var resp *http.Response var err error - u := *c.LeaderURL() for { + u := *c.LeaderURL() u.Path = "/messaging/subscriptions" u.RawQuery = url.Values{ "replicaID": {strconv.FormatUint(replicaID, 10)}, @@ -356,7 +371,7 @@ func (c *Client) Unsubscribe(replicaID, topicID uint64) error { if err != nil { return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location")) } - u = *redirectURL + c.SetLeaderURL(redirectURL) continue } else if resp.StatusCode != http.StatusNoContent { return errors.New(resp.Header.Get("X-Broker-Error"))