Merge pull request #1397 from influxdb/more-broker-redirection

Add additional broker HTTP redirection.
pull/1399/head
Ben Johnson 2015-01-28 03:45:07 -05:00
commit d37ab3a6c0
3 changed files with 141 additions and 68 deletions

View File

@ -220,88 +220,140 @@ func (c *Client) Publish(m *Message) (uint64, error) {
// CreateReplica creates a replica on the broker. // CreateReplica creates a replica on the broker.
func (c *Client) CreateReplica(id uint64) error { func (c *Client) CreateReplica(id uint64) error {
// Send request to the last known leader. var resp *http.Response
var err error
u := *c.LeaderURL() u := *c.LeaderURL()
for {
u.Path = "/messaging/replicas" u.Path = "/messaging/replicas"
u.RawQuery = url.Values{"id": {strconv.FormatUint(id, 10)}}.Encode() u.RawQuery = url.Values{"id": {strconv.FormatUint(id, 10)}}.Encode()
resp, err := http.Post(u.String(), "application/octet-stream", nil) resp, err = http.Post(u.String(), "application/octet-stream", nil)
if err != nil { if err != nil {
return err return err
} }
defer func() { _ = resp.Body.Close() }() defer func() { _ = resp.Body.Close() }()
// If a temporary redirect occurs then update the leader and retry.
// If a non-201 status is returned then an error occurred. // If a non-201 status is returned then an error occurred.
if resp.StatusCode != http.StatusCreated { if resp.StatusCode == http.StatusTemporaryRedirect {
redirectURL, err := url.Parse(resp.Header.Get("Location"))
if err != nil {
return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location"))
}
u = *redirectURL
continue
} else if resp.StatusCode != http.StatusCreated {
return errors.New(resp.Header.Get("X-Broker-Error")) return errors.New(resp.Header.Get("X-Broker-Error"))
} }
break
}
return nil return nil
} }
// DeleteReplica removes a replica on the broker. // DeleteReplica removes a replica on the broker.
func (c *Client) DeleteReplica(id uint64) error { func (c *Client) DeleteReplica(id uint64) error {
// Send request to the last known leader. var resp *http.Response
var err error
u := *c.LeaderURL() u := *c.LeaderURL()
for {
u.Path = "/messaging/replicas" u.Path = "/messaging/replicas"
u.RawQuery = url.Values{"id": {strconv.FormatUint(id, 10)}}.Encode() u.RawQuery = url.Values{"id": {strconv.FormatUint(id, 10)}}.Encode()
req, _ := http.NewRequest("DELETE", u.String(), nil) req, _ := http.NewRequest("DELETE", u.String(), nil)
resp, err := http.DefaultClient.Do(req) resp, err = http.DefaultClient.Do(req)
if err != nil { if err != nil {
return err return err
} }
defer func() { _ = resp.Body.Close() }() defer func() { _ = resp.Body.Close() }()
// If a temporary redirect occurs then update the leader and retry.
// If a non-204 status is returned then an error occurred. // If a non-204 status is returned then an error occurred.
if resp.StatusCode != http.StatusNoContent { if resp.StatusCode == http.StatusTemporaryRedirect {
redirectURL, err := url.Parse(resp.Header.Get("Location"))
if err != nil {
return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location"))
}
u = *redirectURL
continue
} else if resp.StatusCode != http.StatusNoContent {
return errors.New(resp.Header.Get("X-Broker-Error")) return errors.New(resp.Header.Get("X-Broker-Error"))
} }
break
}
return nil return nil
} }
// Subscribe subscribes a replica to a topic on the broker. // Subscribe subscribes a replica to a topic on the broker.
func (c *Client) Subscribe(replicaID, topicID uint64) error { func (c *Client) Subscribe(replicaID, topicID uint64) error {
// Send request to the last known leader. var resp *http.Response
var err error
u := *c.LeaderURL() u := *c.LeaderURL()
for {
u.Path = "/messaging/subscriptions" u.Path = "/messaging/subscriptions"
u.RawQuery = url.Values{ u.RawQuery = url.Values{
"replicaID": {strconv.FormatUint(replicaID, 10)}, "replicaID": {strconv.FormatUint(replicaID, 10)},
"topicID": {strconv.FormatUint(topicID, 10)}, "topicID": {strconv.FormatUint(topicID, 10)},
}.Encode() }.Encode()
resp, err := http.Post(u.String(), "application/octet-stream", nil) resp, err = http.Post(u.String(), "application/octet-stream", nil)
if err != nil { if err != nil {
return err return err
} }
defer func() { _ = resp.Body.Close() }() defer func() { _ = resp.Body.Close() }()
// If a temporary redirect occurs then update the leader and retry.
// If a non-201 status is returned then an error occurred. // If a non-201 status is returned then an error occurred.
if resp.StatusCode != http.StatusCreated { if resp.StatusCode == http.StatusTemporaryRedirect {
redirectURL, err := url.Parse(resp.Header.Get("Location"))
if err != nil {
return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location"))
}
u = *redirectURL
continue
} else if resp.StatusCode != http.StatusCreated {
return errors.New(resp.Header.Get("X-Broker-Error")) return errors.New(resp.Header.Get("X-Broker-Error"))
} }
break
}
return nil return nil
} }
// Unsubscribe unsubscribes a replica from a topic on the broker. // Unsubscribe unsubscribes a replica from a topic on the broker.
func (c *Client) Unsubscribe(replicaID, topicID uint64) error { func (c *Client) Unsubscribe(replicaID, topicID uint64) error {
// Send request to the last known leader. var resp *http.Response
var err error
u := *c.LeaderURL() u := *c.LeaderURL()
for {
u.Path = "/messaging/subscriptions" u.Path = "/messaging/subscriptions"
u.RawQuery = url.Values{ u.RawQuery = url.Values{
"replicaID": {strconv.FormatUint(replicaID, 10)}, "replicaID": {strconv.FormatUint(replicaID, 10)},
"topicID": {strconv.FormatUint(topicID, 10)}, "topicID": {strconv.FormatUint(topicID, 10)},
}.Encode() }.Encode()
req, _ := http.NewRequest("DELETE", u.String(), nil) req, _ := http.NewRequest("DELETE", u.String(), nil)
resp, err := http.DefaultClient.Do(req) resp, err = http.DefaultClient.Do(req)
if err != nil { if err != nil {
return err return err
} }
defer func() { _ = resp.Body.Close() }() defer func() { _ = resp.Body.Close() }()
// If a temporary redirect occurs then update the leader and retry.
// If a non-204 status is returned then an error occurred. // If a non-204 status is returned then an error occurred.
if resp.StatusCode != http.StatusNoContent { if resp.StatusCode == http.StatusTemporaryRedirect {
redirectURL, err := url.Parse(resp.Header.Get("Location"))
if err != nil {
return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location"))
}
u = *redirectURL
continue
} else if resp.StatusCode != http.StatusNoContent {
return errors.New(resp.Header.Get("X-Broker-Error")) return errors.New(resp.Header.Get("X-Broker-Error"))
} }
break
}
return nil return nil
} }

View File

@ -131,14 +131,7 @@ func (h *Handler) publish(w http.ResponseWriter, r *http.Request) {
// Publish message to the broker. // Publish message to the broker.
index, err := h.broker.Publish(m) index, err := h.broker.Publish(m)
if err == raft.ErrNotLeader { if err == raft.ErrNotLeader {
if u := h.broker.LeaderURL(); u != nil { h.redirectToLeader(w, r)
redirectURL := *r.URL
redirectURL.Scheme = u.Scheme
redirectURL.Host = u.Host
http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect)
} else {
h.error(w, err, http.StatusInternalServerError)
}
return return
} else if err != nil { } else if err != nil {
h.error(w, err, http.StatusInternalServerError) h.error(w, err, http.StatusInternalServerError)
@ -161,7 +154,10 @@ func (h *Handler) createReplica(w http.ResponseWriter, r *http.Request) {
} }
// Create a new replica on the broker. // Create a new replica on the broker.
if err := h.broker.CreateReplica(replicaID); err == ErrReplicaExists { if err := h.broker.CreateReplica(replicaID); err == raft.ErrNotLeader {
h.redirectToLeader(w, r)
return
} else if err == ErrReplicaExists {
h.error(w, err, http.StatusConflict) h.error(w, err, http.StatusConflict)
return return
} else if err != nil { } else if err != nil {
@ -183,7 +179,10 @@ func (h *Handler) deleteReplica(w http.ResponseWriter, r *http.Request) {
} }
// Delete the replica on the broker. // Delete the replica on the broker.
if err := h.broker.DeleteReplica(replicaID); err != nil { if err := h.broker.DeleteReplica(replicaID); err == raft.ErrNotLeader {
h.redirectToLeader(w, r)
return
} else if err != nil {
h.error(w, err, http.StatusInternalServerError) h.error(w, err, http.StatusInternalServerError)
return return
} }
@ -211,7 +210,10 @@ func (h *Handler) subscribe(w http.ResponseWriter, r *http.Request) {
} }
// Subscribe a replica to a topic. // Subscribe a replica to a topic.
if err := h.broker.Subscribe(replicaID, topicID); err == ErrReplicaNotFound { if err := h.broker.Subscribe(replicaID, topicID); err == raft.ErrNotLeader {
h.redirectToLeader(w, r)
return
} else if err == ErrReplicaNotFound {
h.error(w, err, http.StatusNotFound) h.error(w, err, http.StatusNotFound)
return return
} else if err != nil { } else if err != nil {
@ -242,7 +244,10 @@ func (h *Handler) unsubscribe(w http.ResponseWriter, r *http.Request) {
} }
// Unsubscribe the replica from the topic. // Unsubscribe the replica from the topic.
if err := h.broker.Unsubscribe(replicaID, topicID); err == ErrReplicaNotFound { if err := h.broker.Unsubscribe(replicaID, topicID); err == raft.ErrNotLeader {
h.redirectToLeader(w, r)
return
} else if err == ErrReplicaNotFound {
h.error(w, err, http.StatusNotFound) h.error(w, err, http.StatusNotFound)
return return
} else if err != nil { } else if err != nil {
@ -258,3 +263,17 @@ func (h *Handler) error(w http.ResponseWriter, err error, code int) {
w.Header().Set("X-Broker-Error", s) w.Header().Set("X-Broker-Error", s)
http.Error(w, s, code) http.Error(w, s, code)
} }
// redirects to the current known leader.
// If no leader is found then returns a 500.
func (h *Handler) redirectToLeader(w http.ResponseWriter, r *http.Request) {
if u := h.broker.LeaderURL(); u != nil {
redirectURL := *r.URL
redirectURL.Scheme = u.Scheme
redirectURL.Host = u.Host
http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect)
return
}
h.error(w, raft.ErrNotLeader, http.StatusInternalServerError)
}

View File

@ -6,6 +6,8 @@ curl -G http://localhost:8086/query --data-urlencode "q=CREATE RETENTION POLICY
echo "inserting data" echo "inserting data"
curl -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2015-01-26T22:01:11.703Z","values": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write curl -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2015-01-26T22:01:11.703Z","values": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write
curl -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2015-01-27T22:01:11.703Z","values": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write
curl -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2015-01-28T22:01:11.703Z","values": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write
echo "querying data" echo "querying data"
curl -G http://localhost:8086/query --data-urlencode "db=foo" --data-urlencode "q=SELECT sum(value) FROM \"foo\".\"bar\".cpu GROUP BY time(1h)" curl -G http://localhost:8086/query --data-urlencode "db=foo" --data-urlencode "q=SELECT sum(value) FROM \"foo\".\"bar\".cpu GROUP BY time(1h)"