influxdb/messaging/client.go

398 lines
10 KiB
Go
Raw Normal View History

2014-10-21 02:42:03 +00:00
package messaging
2014-10-17 04:11:28 +00:00
import (
2014-10-24 00:54:12 +00:00
"bytes"
"encoding/json"
2014-10-17 04:11:28 +00:00
"errors"
2014-10-24 00:54:12 +00:00
"fmt"
"io/ioutil"
2014-11-13 05:32:42 +00:00
"log"
2014-10-17 04:11:28 +00:00
"math/rand"
"net/http"
"net/url"
2014-11-13 05:32:42 +00:00
"os"
2014-10-24 00:54:12 +00:00
"strconv"
2014-10-17 04:11:28 +00:00
"sync"
"time"
)
2014-10-17 15:53:10 +00:00
// DefaultReconnectTimeout is the default time to wait between when a broker
// stream disconnects and another connection is retried.
const DefaultReconnectTimeout = 100 * time.Millisecond
2014-10-17 04:11:28 +00:00
// ClientConfig represents the Client configuration that must be persisted
// across restarts.
type ClientConfig struct {
Brokers []*url.URL `json:"brokers"`
}
// NewClientConfig returns a new instance of ClientConfig.
func NewClientConfig(u []*url.URL) *ClientConfig {
return &ClientConfig{
Brokers: u,
}
}
2014-10-17 04:11:28 +00:00
// Client represents a client for the broker's HTTP API.
// Once opened, the client will stream down all messages that
type Client struct {
mu sync.Mutex
replicaID uint64 // the replica that the client is connecting as.
config ClientConfig // The Client state that must be persisted to disk.
2014-10-17 04:11:28 +00:00
opened bool
2014-10-17 15:53:10 +00:00
done chan chan struct{} // disconnection notification
2014-10-17 04:11:28 +00:00
// Channel streams messages from the broker.
2014-10-22 05:32:19 +00:00
c chan *Message
2014-10-17 15:53:10 +00:00
// The amount of time to wait before reconnecting to a broker stream.
ReconnectTimeout time.Duration
2014-11-13 05:32:42 +00:00
// The logging interface used by the client for out-of-band errors.
Logger *log.Logger
2014-10-17 04:11:28 +00:00
}
// NewClient returns a new instance of Client.
func NewClient(replicaID uint64) *Client {
2014-10-17 04:11:28 +00:00
return &Client{
replicaID: replicaID,
2014-10-17 15:53:10 +00:00
ReconnectTimeout: DefaultReconnectTimeout,
2014-11-13 05:32:42 +00:00
Logger: log.New(os.Stderr, "[messaging] ", log.LstdFlags),
2014-10-17 04:11:28 +00:00
}
}
// ReplicaID returns the replica id that the client was opened with.
func (c *Client) ReplicaID() uint64 { return c.replicaID }
2014-10-17 04:11:28 +00:00
2014-10-22 05:32:19 +00:00
// C returns streaming channel.
// Messages can be duplicated so it is important to check the index
// of the incoming message index to make sure it has not been processed.
func (c *Client) C() <-chan *Message { return c.c }
2014-10-17 04:11:28 +00:00
// URLs returns a list of broker URLs to connect to.
func (c *Client) URLs() []*url.URL {
c.mu.Lock()
defer c.mu.Unlock()
return c.config.Brokers
2014-10-17 04:11:28 +00:00
}
2014-10-24 00:54:12 +00:00
// LeaderURL returns the URL of the broker leader.
func (c *Client) LeaderURL() *url.URL {
c.mu.Lock()
defer c.mu.Unlock()
// TODO(benbjohnson): Actually keep track of the leader.
// HACK(benbjohnson): For testing, just grab a url.
return c.config.Brokers[0]
2014-10-24 00:54:12 +00:00
}
// Open initializes and opens the connection to the cluster. The
// URLs used to contact the cluster are either those supplied to
// the function, or if none are supplied, are read from the file
// at "path". These URLs do need to be URLs of actual Brokers.
// Regardless of URL source, at least 1 URL must be available
// for the client to be successfully opened.
func (c *Client) Open(path string, urls []*url.URL) error {
2014-10-17 04:11:28 +00:00
c.mu.Lock()
defer c.mu.Unlock()
// Return error if the client is already open.
if c.opened {
return ErrClientOpen
}
// Read URLs from file if no URLs are provided.
if len(urls) == 0 {
// Read URLs from config file. There is no guarantee
// that the Brokers URLs in the config file are still
// the Brokers, so we're going to double-check.
b, err := ioutil.ReadFile(path)
if os.IsNotExist(err) {
// nop
} else if err != nil {
return err
} else {
if err := json.Unmarshal(b, &c.config); err != nil {
return err
}
urls = c.config.Brokers
}
}
if len(urls) < 1 {
2014-10-17 04:11:28 +00:00
return ErrBrokerURLRequired
}
// Now that we have the seed URLs, actually use these to
// get the actual Broker URLs. Do that here.
c.config.Brokers = urls // Let's pretend they are the same
2014-10-17 04:11:28 +00:00
// Create a channel for streaming messages.
2014-10-22 05:32:19 +00:00
c.c = make(chan *Message, 0)
2014-10-17 04:11:28 +00:00
// Open the streamer if there's an ID set.
if c.replicaID != 0 {
c.done = make(chan chan struct{})
go c.streamer(c.done)
}
2014-10-17 04:11:28 +00:00
// Set open flag.
c.opened = true
return nil
}
// Close disconnects the client from the broker cluster.
func (c *Client) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
// Return error if the client is already closed.
if !c.opened {
return ErrClientClosed
}
2014-10-17 15:53:10 +00:00
// Shutdown streamer.
if c.done != nil {
ch := make(chan struct{})
c.done <- ch
<-ch
c.done = nil
}
2014-10-17 15:53:10 +00:00
2014-10-17 04:11:28 +00:00
// Close message stream.
2014-10-22 05:32:19 +00:00
close(c.c)
c.c = nil
2014-10-17 04:11:28 +00:00
// Unset open flag.
c.opened = false
return nil
}
2014-10-24 00:54:12 +00:00
// Publish sends a message to the broker and returns an index or error.
2014-11-11 05:25:03 +00:00
func (c *Client) Publish(m *Message) (uint64, error) {
2015-01-28 06:09:50 +00:00
var resp *http.Response
var err error
2014-10-24 00:54:12 +00:00
2015-01-28 06:09:50 +00:00
u := *c.LeaderURL()
for {
// Send the message to the messages endpoint.
u.Path = "/messaging/messages"
u.RawQuery = url.Values{
"type": {strconv.FormatUint(uint64(m.Type), 10)},
"topicID": {strconv.FormatUint(m.TopicID, 10)},
}.Encode()
resp, err = http.Post(u.String(), "application/octet-stream", bytes.NewReader(m.Data))
if err != nil {
return 0, err
}
defer resp.Body.Close()
// If a temporary redirect occurs then update the leader and retry.
// If a non-200 status is returned then an error occurred.
if resp.StatusCode == http.StatusTemporaryRedirect {
redirectURL, err := url.Parse(resp.Header.Get("Location"))
if err != nil {
return 0, fmt.Errorf("bad redirect: %s", resp.Header.Get("Location"))
}
u = *redirectURL
continue
} else if resp.StatusCode != http.StatusOK {
if errstr := resp.Header.Get("X-Broker-Error"); errstr != "" {
return 0, errors.New(errstr)
}
return 0, fmt.Errorf("cannot publish(%d)", resp.StatusCode)
} else {
break
}
2014-10-24 00:54:12 +00:00
}
// Parse broker index.
index, err := strconv.ParseUint(resp.Header.Get("X-Broker-Index"), 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid index: %s", err)
}
return index, nil
}
// CreateReplica creates a replica on the broker.
func (c *Client) CreateReplica(id uint64) error {
// Send request to the last known leader.
u := *c.LeaderURL()
u.Path = "/messaging/replicas"
u.RawQuery = url.Values{"id": {strconv.FormatUint(id, 10)}}.Encode()
resp, err := http.Post(u.String(), "application/octet-stream", nil)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
// If a non-201 status is returned then an error occurred.
if resp.StatusCode != http.StatusCreated {
return errors.New(resp.Header.Get("X-Broker-Error"))
}
return nil
}
// DeleteReplica removes a replica on the broker.
func (c *Client) DeleteReplica(id uint64) error {
// Send request to the last known leader.
u := *c.LeaderURL()
u.Path = "/messaging/replicas"
u.RawQuery = url.Values{"id": {strconv.FormatUint(id, 10)}}.Encode()
req, _ := http.NewRequest("DELETE", u.String(), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
// If a non-204 status is returned then an error occurred.
if resp.StatusCode != http.StatusNoContent {
return errors.New(resp.Header.Get("X-Broker-Error"))
}
2015-01-10 15:48:50 +00:00
return nil
}
// Subscribe subscribes a replica to a topic on the broker.
func (c *Client) Subscribe(replicaID, topicID uint64) error {
// Send request to the last known leader.
u := *c.LeaderURL()
u.Path = "/messaging/subscriptions"
u.RawQuery = url.Values{
"replicaID": {strconv.FormatUint(replicaID, 10)},
"topicID": {strconv.FormatUint(topicID, 10)},
}.Encode()
resp, err := http.Post(u.String(), "application/octet-stream", nil)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
// If a non-201 status is returned then an error occurred.
if resp.StatusCode != http.StatusCreated {
return errors.New(resp.Header.Get("X-Broker-Error"))
}
return nil
}
// Unsubscribe unsubscribes a replica from a topic on the broker.
func (c *Client) Unsubscribe(replicaID, topicID uint64) error {
// Send request to the last known leader.
u := *c.LeaderURL()
u.Path = "/messaging/subscriptions"
u.RawQuery = url.Values{
"replicaID": {strconv.FormatUint(replicaID, 10)},
"topicID": {strconv.FormatUint(topicID, 10)},
}.Encode()
req, _ := http.NewRequest("DELETE", u.String(), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
// If a non-204 status is returned then an error occurred.
if resp.StatusCode != http.StatusNoContent {
return errors.New(resp.Header.Get("X-Broker-Error"))
}
return nil
}
2014-10-17 04:11:28 +00:00
// streamer connects to a broker server and streams the replica's messages.
2014-10-17 15:53:10 +00:00
func (c *Client) streamer(done chan chan struct{}) {
2014-10-17 04:11:28 +00:00
for {
// Check for the client disconnection.
select {
2014-10-17 15:53:10 +00:00
case ch := <-done:
close(ch)
2014-10-17 04:11:28 +00:00
return
default:
}
// TODO: Validate that there is at least one broker URL.
// Choose a random broker URL.
urls := c.URLs()
u := *urls[rand.Intn(len(urls))]
// Connect to broker and stream.
u.Path = "/messaging/messages"
2014-10-17 04:11:28 +00:00
if err := c.streamFromURL(&u, done); err == errDone {
return
2014-11-13 05:32:42 +00:00
} else if err != nil {
c.Logger.Print(err)
2014-10-17 04:11:28 +00:00
}
}
}
// streamFromURL connects to a broker server and streams the replica's messages.
2014-10-17 15:53:10 +00:00
func (c *Client) streamFromURL(u *url.URL, done chan chan struct{}) error {
// Set the replica id on the URL and open the stream.
u.RawQuery = url.Values{"replicaID": {strconv.FormatUint(c.replicaID, 10)}}.Encode()
2014-10-17 04:11:28 +00:00
resp, err := http.Get(u.String())
if err != nil {
2014-10-17 15:53:10 +00:00
time.Sleep(c.ReconnectTimeout)
2014-10-17 04:11:28 +00:00
return nil
}
defer func() { _ = resp.Body.Close() }()
// Ensure that we received a 200 OK from the server before streaming.
if resp.StatusCode != http.StatusOK {
2014-10-17 15:53:10 +00:00
time.Sleep(c.ReconnectTimeout)
2015-01-28 06:09:50 +00:00
c.Logger.Printf("reconnecting to broker: %s (status=%d)", u, resp.StatusCode)
2014-10-17 15:53:10 +00:00
return nil
2014-10-17 04:11:28 +00:00
}
2015-01-28 06:09:50 +00:00
c.Logger.Printf("connected to broker: %s", u)
2014-10-17 15:53:10 +00:00
// Continuously decode messages from request body in a separate goroutine.
errNotify := make(chan error, 0)
go func() {
dec := NewMessageDecoder(resp.Body)
for {
// Decode message from the stream.
m := &Message{}
if err := dec.Decode(m); err != nil {
errNotify <- err
return
}
// TODO: Write broker set updates, do not passthrough to channel.
2014-10-17 15:53:10 +00:00
// Write message to streaming channel.
2014-10-22 05:32:19 +00:00
c.c <- m
2014-10-17 04:11:28 +00:00
}
2014-10-17 15:53:10 +00:00
}()
2014-10-17 04:11:28 +00:00
2014-10-17 15:53:10 +00:00
// Check for the client disconnect or error from the stream.
select {
case ch := <-done:
// Close body.
2014-11-13 05:32:42 +00:00
_ = resp.Body.Close()
2014-10-17 04:11:28 +00:00
2014-10-17 15:53:10 +00:00
// Clear message buffer.
2014-10-17 04:11:28 +00:00
select {
2014-10-22 05:32:19 +00:00
case <-c.c:
2014-10-17 04:11:28 +00:00
default:
}
2014-10-17 15:53:10 +00:00
// Notify the close function and return marker error.
close(ch)
return errDone
case err := <-errNotify:
return err
2014-10-17 04:11:28 +00:00
}
}
// marker error for the streamer.
var errDone = errors.New("done")