From 0c64014fac39bb76e3caa447d1fb5ffbaca88d82 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 28 Jan 2015 03:41:28 -0500 Subject: [PATCH] Add additional broker HTTP redirection. --- messaging/client.go | 164 ++++++++++++++++++++++++++------------- messaging/handler.go | 43 +++++++--- tests/create_write_query | 2 + 3 files changed, 141 insertions(+), 68 deletions(-) diff --git a/messaging/client.go b/messaging/client.go index 883ebb144f..7dab0e4889 100644 --- a/messaging/client.go +++ b/messaging/client.go @@ -220,19 +220,32 @@ func (c *Client) Publish(m *Message) (uint64, error) { // CreateReplica creates a replica on the broker. func (c *Client) CreateReplica(id uint64) error { - // Send request to the last known leader. - u := *c.LeaderURL() - u.Path = "/messaging/replicas" - u.RawQuery = url.Values{"id": {strconv.FormatUint(id, 10)}}.Encode() - resp, err := http.Post(u.String(), "application/octet-stream", nil) - if err != nil { - return err - } - defer func() { _ = resp.Body.Close() }() + var resp *http.Response + var err error - // If a non-201 status is returned then an error occurred. - if resp.StatusCode != http.StatusCreated { - return errors.New(resp.Header.Get("X-Broker-Error")) + u := *c.LeaderURL() + for { + u.Path = "/messaging/replicas" + u.RawQuery = url.Values{"id": {strconv.FormatUint(id, 10)}}.Encode() + resp, err = http.Post(u.String(), "application/octet-stream", nil) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() + + // If a temporary redirect occurs then update the leader and retry. + // If a non-201 status is returned then an error occurred. + if resp.StatusCode == http.StatusTemporaryRedirect { + redirectURL, err := url.Parse(resp.Header.Get("Location")) + if err != nil { + return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location")) + } + u = *redirectURL + continue + } else if resp.StatusCode != http.StatusCreated { + return errors.New(resp.Header.Get("X-Broker-Error")) + } + break } return nil @@ -240,20 +253,33 @@ func (c *Client) CreateReplica(id uint64) error { // DeleteReplica removes a replica on the broker. func (c *Client) DeleteReplica(id uint64) error { - // Send request to the last known leader. - u := *c.LeaderURL() - u.Path = "/messaging/replicas" - u.RawQuery = url.Values{"id": {strconv.FormatUint(id, 10)}}.Encode() - req, _ := http.NewRequest("DELETE", u.String(), nil) - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer func() { _ = resp.Body.Close() }() + var resp *http.Response + var err error - // If a non-204 status is returned then an error occurred. - if resp.StatusCode != http.StatusNoContent { - return errors.New(resp.Header.Get("X-Broker-Error")) + u := *c.LeaderURL() + for { + u.Path = "/messaging/replicas" + u.RawQuery = url.Values{"id": {strconv.FormatUint(id, 10)}}.Encode() + req, _ := http.NewRequest("DELETE", u.String(), nil) + resp, err = http.DefaultClient.Do(req) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() + + // If a temporary redirect occurs then update the leader and retry. + // If a non-204 status is returned then an error occurred. + if resp.StatusCode == http.StatusTemporaryRedirect { + redirectURL, err := url.Parse(resp.Header.Get("Location")) + if err != nil { + return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location")) + } + u = *redirectURL + continue + } else if resp.StatusCode != http.StatusNoContent { + return errors.New(resp.Header.Get("X-Broker-Error")) + } + break } return nil @@ -261,22 +287,35 @@ func (c *Client) DeleteReplica(id uint64) error { // Subscribe subscribes a replica to a topic on the broker. func (c *Client) Subscribe(replicaID, topicID uint64) error { - // Send request to the last known leader. - u := *c.LeaderURL() - u.Path = "/messaging/subscriptions" - u.RawQuery = url.Values{ - "replicaID": {strconv.FormatUint(replicaID, 10)}, - "topicID": {strconv.FormatUint(topicID, 10)}, - }.Encode() - resp, err := http.Post(u.String(), "application/octet-stream", nil) - if err != nil { - return err - } - defer func() { _ = resp.Body.Close() }() + var resp *http.Response + var err error - // If a non-201 status is returned then an error occurred. - if resp.StatusCode != http.StatusCreated { - return errors.New(resp.Header.Get("X-Broker-Error")) + u := *c.LeaderURL() + for { + u.Path = "/messaging/subscriptions" + u.RawQuery = url.Values{ + "replicaID": {strconv.FormatUint(replicaID, 10)}, + "topicID": {strconv.FormatUint(topicID, 10)}, + }.Encode() + resp, err = http.Post(u.String(), "application/octet-stream", nil) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() + + // If a temporary redirect occurs then update the leader and retry. + // If a non-201 status is returned then an error occurred. + if resp.StatusCode == http.StatusTemporaryRedirect { + redirectURL, err := url.Parse(resp.Header.Get("Location")) + if err != nil { + return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location")) + } + u = *redirectURL + continue + } else if resp.StatusCode != http.StatusCreated { + return errors.New(resp.Header.Get("X-Broker-Error")) + } + break } return nil @@ -284,23 +323,36 @@ func (c *Client) Subscribe(replicaID, topicID uint64) error { // Unsubscribe unsubscribes a replica from a topic on the broker. func (c *Client) Unsubscribe(replicaID, topicID uint64) error { - // Send request to the last known leader. - u := *c.LeaderURL() - u.Path = "/messaging/subscriptions" - u.RawQuery = url.Values{ - "replicaID": {strconv.FormatUint(replicaID, 10)}, - "topicID": {strconv.FormatUint(topicID, 10)}, - }.Encode() - req, _ := http.NewRequest("DELETE", u.String(), nil) - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer func() { _ = resp.Body.Close() }() + var resp *http.Response + var err error - // If a non-204 status is returned then an error occurred. - if resp.StatusCode != http.StatusNoContent { - return errors.New(resp.Header.Get("X-Broker-Error")) + u := *c.LeaderURL() + for { + u.Path = "/messaging/subscriptions" + u.RawQuery = url.Values{ + "replicaID": {strconv.FormatUint(replicaID, 10)}, + "topicID": {strconv.FormatUint(topicID, 10)}, + }.Encode() + req, _ := http.NewRequest("DELETE", u.String(), nil) + resp, err = http.DefaultClient.Do(req) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() + + // If a temporary redirect occurs then update the leader and retry. + // If a non-204 status is returned then an error occurred. + if resp.StatusCode == http.StatusTemporaryRedirect { + redirectURL, err := url.Parse(resp.Header.Get("Location")) + if err != nil { + return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location")) + } + u = *redirectURL + continue + } else if resp.StatusCode != http.StatusNoContent { + return errors.New(resp.Header.Get("X-Broker-Error")) + } + break } return nil diff --git a/messaging/handler.go b/messaging/handler.go index 5e66441646..0b5db8aefe 100644 --- a/messaging/handler.go +++ b/messaging/handler.go @@ -131,14 +131,7 @@ func (h *Handler) publish(w http.ResponseWriter, r *http.Request) { // Publish message to the broker. index, err := h.broker.Publish(m) if err == raft.ErrNotLeader { - if u := h.broker.LeaderURL(); u != nil { - redirectURL := *r.URL - redirectURL.Scheme = u.Scheme - redirectURL.Host = u.Host - http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect) - } else { - h.error(w, err, http.StatusInternalServerError) - } + h.redirectToLeader(w, r) return } else if err != nil { h.error(w, err, http.StatusInternalServerError) @@ -161,7 +154,10 @@ func (h *Handler) createReplica(w http.ResponseWriter, r *http.Request) { } // Create a new replica on the broker. - if err := h.broker.CreateReplica(replicaID); err == ErrReplicaExists { + if err := h.broker.CreateReplica(replicaID); err == raft.ErrNotLeader { + h.redirectToLeader(w, r) + return + } else if err == ErrReplicaExists { h.error(w, err, http.StatusConflict) return } else if err != nil { @@ -183,7 +179,10 @@ func (h *Handler) deleteReplica(w http.ResponseWriter, r *http.Request) { } // Delete the replica on the broker. - if err := h.broker.DeleteReplica(replicaID); err != nil { + if err := h.broker.DeleteReplica(replicaID); err == raft.ErrNotLeader { + h.redirectToLeader(w, r) + return + } else if err != nil { h.error(w, err, http.StatusInternalServerError) return } @@ -211,7 +210,10 @@ func (h *Handler) subscribe(w http.ResponseWriter, r *http.Request) { } // Subscribe a replica to a topic. - if err := h.broker.Subscribe(replicaID, topicID); err == ErrReplicaNotFound { + if err := h.broker.Subscribe(replicaID, topicID); err == raft.ErrNotLeader { + h.redirectToLeader(w, r) + return + } else if err == ErrReplicaNotFound { h.error(w, err, http.StatusNotFound) return } else if err != nil { @@ -242,7 +244,10 @@ func (h *Handler) unsubscribe(w http.ResponseWriter, r *http.Request) { } // Unsubscribe the replica from the topic. - if err := h.broker.Unsubscribe(replicaID, topicID); err == ErrReplicaNotFound { + if err := h.broker.Unsubscribe(replicaID, topicID); err == raft.ErrNotLeader { + h.redirectToLeader(w, r) + return + } else if err == ErrReplicaNotFound { h.error(w, err, http.StatusNotFound) return } else if err != nil { @@ -258,3 +263,17 @@ func (h *Handler) error(w http.ResponseWriter, err error, code int) { w.Header().Set("X-Broker-Error", s) http.Error(w, s, code) } + +// redirects to the current known leader. +// If no leader is found then returns a 500. +func (h *Handler) redirectToLeader(w http.ResponseWriter, r *http.Request) { + if u := h.broker.LeaderURL(); u != nil { + redirectURL := *r.URL + redirectURL.Scheme = u.Scheme + redirectURL.Host = u.Host + http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect) + return + } + + h.error(w, raft.ErrNotLeader, http.StatusInternalServerError) +} diff --git a/tests/create_write_query b/tests/create_write_query index 00297268a2..a251ecd23b 100755 --- a/tests/create_write_query +++ b/tests/create_write_query @@ -6,6 +6,8 @@ curl -G http://localhost:8086/query --data-urlencode "q=CREATE RETENTION POLICY echo "inserting data" curl -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2015-01-26T22:01:11.703Z","values": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write +curl -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2015-01-27T22:01:11.703Z","values": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write +curl -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2015-01-28T22:01:11.703Z","values": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write echo "querying data" curl -G http://localhost:8086/query --data-urlencode "db=foo" --data-urlencode "q=SELECT sum(value) FROM \"foo\".\"bar\".cpu GROUP BY time(1h)"